Skip to content

Commit

Permalink
Merge pull request #391 from liftbridge-io/change_auto_pause
Browse files Browse the repository at this point in the history
Allow cursors.stream.auto.pause.time to be changed
  • Loading branch information
tylertreat authored Mar 11, 2022
2 parents 73ec0f3 + 45c05c6 commit edc316d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 12 deletions.
25 changes: 18 additions & 7 deletions server/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
client "github.com/liftbridge-io/liftbridge-api/go"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

proto "github.com/liftbridge-io/liftbridge/server/protocol"
)
Expand Down Expand Up @@ -209,13 +208,25 @@ func (a *activityManager) handleRaftLog(l *raft.Log) error {
// createActivityStream creates the activity stream and connects a local client
// that will be subscribed to it.
func (a *activityManager) createActivityStream() error {
_, err := a.api.CreateStream(context.Background(), &client.CreateStreamRequest{
Subject: a.getActivityStreamSubject(),
Name: activityStream,
ReplicationFactor: -1,
status := a.metadata.CreateStream(context.Background(), &proto.CreateStreamOp{
Stream: &proto.Stream{
Name: activityStream,
Subject: a.getActivityStreamSubject(),
Partitions: []*proto.Partition{
{
Stream: activityStream,
Subject: a.getActivityStreamSubject(),
ReplicationFactor: -1,
Id: 0,
},
},
},
})
if err != nil && status.Convert(err).Code() != codes.AlreadyExists {
return errors.Wrap(err, "failed to create an activity stream")
if status == nil {
return nil
}
if status.Code() != codes.AlreadyExists {
return errors.Wrap(status.Err(), "failed to create activity stream")
}

return nil
Expand Down
19 changes: 19 additions & 0 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (a *apiServer) CreateStream(ctx context.Context, req *client.CreateStreamRe
a.logger.Errorf("api: Failed to create stream: subject is invalid")
return nil, status.Error(codes.InvalidArgument, "Subject is invalid")
}
if isReservedStream(req.Name) {
a.logger.Errorf("api: Failed to create stream: stream is reserved")
return nil, status.Error(codes.InvalidArgument, "Stream is reserved")
}

partitions := make([]*proto.Partition, req.Partitions)
for i := int32(0); i < req.Partitions; i++ {
Expand Down Expand Up @@ -98,6 +102,11 @@ func (a *apiServer) DeleteStream(ctx context.Context, req *client.DeleteStreamRe
a.logger.Debugf("api: DeleteStream [name=%s]",
req.Name)

if isReservedStream(req.Name) {
a.logger.Errorf("api: Failed to delete stream: stream is reserved")
return nil, status.Error(codes.InvalidArgument, "Stream is reserved")
}

if e := a.metadata.DeleteStream(ctx, &proto.DeleteStreamOp{
Stream: req.Name,
}); e != nil {
Expand Down Expand Up @@ -1087,3 +1096,13 @@ func (p *publishAsyncSession) close() {
p.sub.Unsubscribe()
}
}

// isReservedStream indicates if the provided stream name is a reserved stream.
func isReservedStream(stream string) bool {
for _, reserved := range reservedStreams {
if stream == reserved {
return true
}
}
return false
}
13 changes: 11 additions & 2 deletions server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func assertMsg(t *testing.T, expected *message, msg *lift.Message) {
}

// Ensure creating a stream works, and it returns an error when creating the
// same stream.
// same stream or creating a stream that is reserved.
func TestCreateStream(t *testing.T) {
defer cleanupStorage(t)

Expand All @@ -55,6 +55,10 @@ func TestCreateStream(t *testing.T) {
// Creating the same stream returns ErrStreamExists.
err = client.CreateStream(context.Background(), "foo", "bar")
require.Equal(t, lift.ErrStreamExists, err)

// Creating a reserved stream fails.
err = client.CreateStream(context.Background(), "foo", cursorsStream)
require.Error(t, err)
}

// Ensure creating a stream works when we send the request to the metadata
Expand Down Expand Up @@ -223,7 +227,8 @@ func TestSubscribeStreamNoSuchStream(t *testing.T) {
require.Contains(t, err.Error(), "No such partition")
}

// Ensure getting a deleted stream returns nil.
// Ensure getting a deleted stream returns nil and deleting a reserved stream
// returns an error.
func TestDeleteStream(t *testing.T) {
defer cleanupStorage(t)

Expand Down Expand Up @@ -277,6 +282,10 @@ func TestDeleteStream(t *testing.T) {

err = client.CreateStream(context.Background(), "foo", "foo", lift.Partitions(3))
require.NoError(t, err)

// Deleting a reserved stream fails.
err = client.DeleteStream(context.Background(), cursorsStream)
require.Error(t, err)
}

// Ensure deleting a stream works when we send the request to the metadata
Expand Down
2 changes: 1 addition & 1 deletion server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func (m *metadataAPI) AddStream(protoStream *proto.Stream, recovered bool, epoch

config := protoStream.GetConfig()
creationTime := time.Unix(0, protoStream.CreationTimestamp)
stream := newStream(protoStream.Name, protoStream.Subject, config, creationTime)
stream := newStream(protoStream.Name, protoStream.Subject, config, creationTime, m.config)
m.streams[protoStream.Name] = stream

for _, partition := range protoStream.Partitions {
Expand Down
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
cursorsStream = "__cursors"
)

// reservedStreams contains reserved internal stream names.
var reservedStreams = []string{activityStream, cursorsStream}

// RaftLog represents an entry into the Raft log.
type RaftLog struct {
*raft.Log
Expand Down
23 changes: 21 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,33 @@ type stream struct {

// newStream creates a stream for the given NATS subject. All stream
// interactions should only go through the exported functions.
func newStream(name, subject string, config *proto.StreamConfig, creationTime time.Time) *stream {
return &stream{
func newStream(name, subject string, config *proto.StreamConfig, creationTime time.Time,
srvConfig *Config) *stream {

s := &stream{
name: name,
subject: subject,
config: config,
partitions: make(map[int32]*partition),
creationTime: creationTime,
}
if isReservedStream(name) {
applyReservedStreamOverrides(s, srvConfig)
}
return s
}

// applyReservedStreamOverrides lets us apply special handling to internal
// streams. This is used to allow changing configuration of internal streams on
// server restart without having to apply the changes through Raft.
// TODO: This is a bit of a hack and should probably be replaced if/when a
// stream patch Raft operation is supported.
func applyReservedStreamOverrides(s *stream, config *Config) {
if s.name == cursorsStream {
s.config.AutoPauseTime = &proto.NullableInt64{
Value: config.CursorsStream.AutoPauseTime.Milliseconds(),
}
}
}

// String returns a human-readable representation of the stream.
Expand Down

0 comments on commit edc316d

Please sign in to comment.