Skip to content

Commit

Permalink
Merge pull request #2044 from josephschorr/watch-change-buffer-limit
Browse files Browse the repository at this point in the history
Add configurable max buffer size for watch change tracker
  • Loading branch information
vroldanbet authored Aug 30, 2024
2 parents 78047ed + 0446dcc commit c237aa2
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 54 deletions.
122 changes: 102 additions & 20 deletions internal/datastore/common/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ const (
// Changes represents a set of datastore mutations that are kept self-consistent
// across one or more transaction revisions.
type Changes[R datastore.Revision, K comparable] struct {
records map[K]changeRecord[R]
keyFunc func(R) K
content datastore.WatchContent
records map[K]changeRecord[R]
keyFunc func(R) K
content datastore.WatchContent
maxByteSize uint64
currentByteSize int64
}

type changeRecord[R datastore.Revision] struct {
Expand All @@ -36,11 +38,13 @@ type changeRecord[R datastore.Revision] struct {
}

// NewChanges creates a new Changes object for change tracking and de-duplication.
func NewChanges[R datastore.Revision, K comparable](keyFunc func(R) K, content datastore.WatchContent) *Changes[R, K] {
func NewChanges[R datastore.Revision, K comparable](keyFunc func(R) K, content datastore.WatchContent, maxByteSize uint64) *Changes[R, K] {
return &Changes[R, K]{
records: make(map[K]changeRecord[R], 0),
keyFunc: keyFunc,
content: content,
records: make(map[K]changeRecord[R], 0),
keyFunc: keyFunc,
content: content,
maxByteSize: maxByteSize,
currentByteSize: 0,
}
}

Expand All @@ -60,28 +64,68 @@ func (ch *Changes[R, K]) AddRelationshipChange(
return nil
}

record := ch.recordForRevision(rev)
record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

tplKey := tuple.StringWithoutCaveat(tpl)

switch op {
case core.RelationTupleUpdate_TOUCH:
// If there was a delete for the same tuple at the same revision, drop it
delete(record.tupleDeletes, tplKey)
existing, ok := record.tupleDeletes[tplKey]
if ok {
delete(record.tupleDeletes, tplKey)
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.tupleTouches[tplKey] = tpl
if err := ch.adjustByteSize(tpl, 1); err != nil {
return err
}

case core.RelationTupleUpdate_DELETE:
_, alreadyTouched := record.tupleTouches[tplKey]
if !alreadyTouched {
record.tupleDeletes[tplKey] = tpl
if err := ch.adjustByteSize(tpl, 1); err != nil {
return err
}
}

default:
log.Ctx(ctx).Warn().Stringer("operation", op).Msg("unknown change operation")
return spiceerrors.MustBugf("unknown change operation")
}
return nil
}

func (ch *Changes[R, K]) recordForRevision(rev R) changeRecord[R] {
type sized interface {
SizeVT() int
}

func (ch *Changes[R, K]) adjustByteSize(item sized, delta int) error {
if ch.maxByteSize == 0 {
return nil
}

size := item.SizeVT() * delta
ch.currentByteSize += int64(size)
if ch.currentByteSize < 0 {
return spiceerrors.MustBugf("byte size underflow")
}

if ch.currentByteSize > int64(ch.maxByteSize) {
return NewMaximumChangesSizeExceededError(ch.maxByteSize)
}

return nil
}

func (ch *Changes[R, K]) recordForRevision(rev R) (changeRecord[R], error) {
k := ch.keyFunc(rev)
revisionChanges, ok := ch.records[k]
if !ok {
Expand All @@ -96,39 +140,49 @@ func (ch *Changes[R, K]) recordForRevision(rev R) changeRecord[R] {
ch.records[k] = revisionChanges
}

return revisionChanges
return revisionChanges, nil
}

// AddDeletedNamespace adds a change indicating that the namespace with the name was deleted.
func (ch *Changes[R, K]) AddDeletedNamespace(
_ context.Context,
rev R,
namespaceName string,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

record := ch.recordForRevision(rev)
delete(record.definitionsChanged, nsPrefix+namespaceName)

record.namespacesDeleted[namespaceName] = struct{}{}
return nil
}

// AddDeletedCaveat adds a change indicating that the caveat with the name was deleted.
func (ch *Changes[R, K]) AddDeletedCaveat(
_ context.Context,
rev R,
caveatName string,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

record := ch.recordForRevision(rev)
delete(record.definitionsChanged, caveatPrefix+caveatName)

record.caveatsDeleted[caveatName] = struct{}{}
return nil
}

// AddChangedDefinition adds a change indicating that the schema definition (namespace or caveat)
Expand All @@ -137,24 +191,52 @@ func (ch *Changes[R, K]) AddChangedDefinition(
ctx context.Context,
rev R,
def datastore.SchemaDefinition,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record := ch.recordForRevision(rev)
record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

switch t := def.(type) {
case *core.NamespaceDefinition:
delete(record.namespacesDeleted, t.Name)

if existing, ok := record.definitionsChanged[nsPrefix+t.Name]; ok {
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.definitionsChanged[nsPrefix+t.Name] = t

if err := ch.adjustByteSize(t, 1); err != nil {
return err
}

case *core.CaveatDefinition:
delete(record.caveatsDeleted, t.Name)

if existing, ok := record.definitionsChanged[nsPrefix+t.Name]; ok {
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.definitionsChanged[caveatPrefix+t.Name] = t

if err := ch.adjustByteSize(t, 1); err != nil {
return err
}

default:
log.Ctx(ctx).Fatal().Msg("unknown schema definition kind")
}

return nil
}

// AsRevisionChanges returns the list of changes processed so far as a datastore watch
Expand Down
85 changes: 72 additions & 13 deletions internal/datastore/common/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestChanges(t *testing.T) {
require := require.New(t)

ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
for _, step := range tc.script {
if step.relationship != "" {
rel := tuple.MustParse(step.relationship)
Expand All @@ -315,15 +315,18 @@ func TestChanges(t *testing.T) {
}

for _, changed := range step.changedDefinitions {
ch.AddChangedDefinition(ctx, revisions.NewForTransactionID(step.revision), changed)
err := ch.AddChangedDefinition(ctx, revisions.NewForTransactionID(step.revision), changed)
require.NoError(err)
}

for _, ns := range step.deletedNamespaces {
ch.AddDeletedNamespace(ctx, revisions.NewForTransactionID(step.revision), ns)
err := ch.AddDeletedNamespace(ctx, revisions.NewForTransactionID(step.revision), ns)
require.NoError(err)
}

for _, c := range step.deletedCaveats {
ch.AddDeletedCaveat(ctx, revisions.NewForTransactionID(step.revision), c)
err := ch.AddDeletedCaveat(ctx, revisions.NewForTransactionID(step.revision), c)
require.NoError(err)
}
}

Expand All @@ -337,7 +340,7 @@ func TestChanges(t *testing.T) {

func TestFilteredSchemaChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

require.NoError(t, ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:firstdoc#viewer@user:tom"), core.RelationTupleUpdate_TOUCH))
Expand All @@ -346,22 +349,28 @@ func TestFilteredSchemaChanges(t *testing.T) {

func TestFilteredRelationshipChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships, 0)
require.True(t, ch.IsEmpty())

ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
err := ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
require.NoError(t, err)
require.True(t, ch.IsEmpty())
}

func TestFilterAndRemoveRevisionChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)

require.True(t, ch.IsEmpty())

ch.AddDeletedNamespace(ctx, rev1, "deletedns1")
ch.AddDeletedNamespace(ctx, rev2, "deletedns2")
ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
err := ch.AddDeletedNamespace(ctx, rev1, "deletedns1")
require.NoError(t, err)

err = ch.AddDeletedNamespace(ctx, rev2, "deletedns2")
require.NoError(t, err)

err = ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
require.NoError(t, err)

require.False(t, ch.IsEmpty())

Expand Down Expand Up @@ -408,7 +417,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
func TestHLCOrdering(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

rev1, err := revisions.HLCRevisionFromString("1.0000000001")
Expand Down Expand Up @@ -451,7 +460,7 @@ func TestHLCOrdering(t *testing.T) {
func TestHLCSameRevision(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
Expand Down Expand Up @@ -496,6 +505,56 @@ func TestHLCSameRevision(t *testing.T) {
}, remaining)
}

func TestMaximumSize(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 150)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
require.NoError(t, err)

rev1, err := revisions.HLCRevisionFromString("2")
require.NoError(t, err)

rev2, err := revisions.HLCRevisionFromString("3")
require.NoError(t, err)

rev3, err := revisions.HLCRevisionFromString("4")
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev2, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev3, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.Error(t, err)
require.ErrorContains(t, err, "maximum changes byte size of 150 exceeded")
}

func TestMaximumSizeReplacement(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 43)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)
require.Equal(t, int64(43), ch.currentByteSize)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_DELETE)
require.NoError(t, err)
require.Equal(t, int64(43), ch.currentByteSize)
}

func TestCanonicalize(t *testing.T) {
testCases := []struct {
name string
Expand Down
11 changes: 11 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,14 @@ type RevisionUnavailableError struct {
func NewRevisionUnavailableError(err error) error {
return RevisionUnavailableError{err}
}

// MaximumChangesSizeExceededError is returned when the maximum size of changes is exceeded.
type MaximumChangesSizeExceededError struct {
error
maxSize uint64
}

// NewMaximumChangesSizeExceededError creates a new MaximumChangesSizeExceededError.
func NewMaximumChangesSizeExceededError(maxSize uint64) error {
return MaximumChangesSizeExceededError{fmt.Errorf("maximum changes byte size of %d exceeded", maxSize), maxSize}
}
Loading

0 comments on commit c237aa2

Please sign in to comment.