Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable max buffer size for watch change tracker #2044

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions internal/datastore/common/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ const (
// Changes represents a set of datastore mutations that are kept self-consistent
// across one or more transaction revisions.
type Changes[R datastore.Revision, K comparable] struct {
records map[K]changeRecord[R]
keyFunc func(R) K
content datastore.WatchContent
records map[K]changeRecord[R]
keyFunc func(R) K
content datastore.WatchContent
maxByteSize uint64
currentByteSize int64
}

type changeRecord[R datastore.Revision] struct {
Expand All @@ -36,11 +38,13 @@ type changeRecord[R datastore.Revision] struct {
}

// NewChanges creates a new Changes object for change tracking and de-duplication.
func NewChanges[R datastore.Revision, K comparable](keyFunc func(R) K, content datastore.WatchContent) *Changes[R, K] {
func NewChanges[R datastore.Revision, K comparable](keyFunc func(R) K, content datastore.WatchContent, maxByteSize uint64) *Changes[R, K] {
return &Changes[R, K]{
records: make(map[K]changeRecord[R], 0),
keyFunc: keyFunc,
content: content,
records: make(map[K]changeRecord[R], 0),
keyFunc: keyFunc,
content: content,
maxByteSize: maxByteSize,
currentByteSize: 0,
}
}

Expand All @@ -60,28 +64,68 @@ func (ch *Changes[R, K]) AddRelationshipChange(
return nil
}

record := ch.recordForRevision(rev)
record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

tplKey := tuple.StringWithoutCaveat(tpl)

switch op {
case core.RelationTupleUpdate_TOUCH:
// If there was a delete for the same tuple at the same revision, drop it
delete(record.tupleDeletes, tplKey)
existing, ok := record.tupleDeletes[tplKey]
if ok {
delete(record.tupleDeletes, tplKey)
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.tupleTouches[tplKey] = tpl
if err := ch.adjustByteSize(tpl, 1); err != nil {
return err
}

case core.RelationTupleUpdate_DELETE:
_, alreadyTouched := record.tupleTouches[tplKey]
if !alreadyTouched {
record.tupleDeletes[tplKey] = tpl
if err := ch.adjustByteSize(tpl, 1); err != nil {
return err
}
}

default:
log.Ctx(ctx).Warn().Stringer("operation", op).Msg("unknown change operation")
return spiceerrors.MustBugf("unknown change operation")
}
return nil
}

func (ch *Changes[R, K]) recordForRevision(rev R) changeRecord[R] {
type sized interface {
SizeVT() int
}

func (ch *Changes[R, K]) adjustByteSize(item sized, delta int) error {
if ch.maxByteSize == 0 {
return nil
}

size := item.SizeVT() * delta
ch.currentByteSize += int64(size)
if ch.currentByteSize < 0 {
return spiceerrors.MustBugf("byte size underflow")
}

if ch.currentByteSize > int64(ch.maxByteSize) {
return NewMaximumChangesSizeExceededError(ch.maxByteSize)
}

return nil
}

func (ch *Changes[R, K]) recordForRevision(rev R) (changeRecord[R], error) {
k := ch.keyFunc(rev)
revisionChanges, ok := ch.records[k]
if !ok {
Expand All @@ -96,39 +140,49 @@ func (ch *Changes[R, K]) recordForRevision(rev R) changeRecord[R] {
ch.records[k] = revisionChanges
}

return revisionChanges
return revisionChanges, nil
}

// AddDeletedNamespace adds a change indicating that the namespace with the name was deleted.
func (ch *Changes[R, K]) AddDeletedNamespace(
_ context.Context,
rev R,
namespaceName string,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

record := ch.recordForRevision(rev)
delete(record.definitionsChanged, nsPrefix+namespaceName)

record.namespacesDeleted[namespaceName] = struct{}{}
return nil
}

// AddDeletedCaveat adds a change indicating that the caveat with the name was deleted.
func (ch *Changes[R, K]) AddDeletedCaveat(
_ context.Context,
rev R,
caveatName string,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

record := ch.recordForRevision(rev)
delete(record.definitionsChanged, caveatPrefix+caveatName)

record.caveatsDeleted[caveatName] = struct{}{}
return nil
}

// AddChangedDefinition adds a change indicating that the schema definition (namespace or caveat)
Expand All @@ -137,24 +191,52 @@ func (ch *Changes[R, K]) AddChangedDefinition(
ctx context.Context,
rev R,
def datastore.SchemaDefinition,
) {
) error {
if ch.content&datastore.WatchSchema != datastore.WatchSchema {
return
return nil
}

record := ch.recordForRevision(rev)
record, err := ch.recordForRevision(rev)
if err != nil {
return err
}

switch t := def.(type) {
case *core.NamespaceDefinition:
delete(record.namespacesDeleted, t.Name)

if existing, ok := record.definitionsChanged[nsPrefix+t.Name]; ok {
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.definitionsChanged[nsPrefix+t.Name] = t

if err := ch.adjustByteSize(t, 1); err != nil {
return err
}

case *core.CaveatDefinition:
delete(record.caveatsDeleted, t.Name)

if existing, ok := record.definitionsChanged[nsPrefix+t.Name]; ok {
if err := ch.adjustByteSize(existing, -1); err != nil {
return err
}
}

record.definitionsChanged[caveatPrefix+t.Name] = t

if err := ch.adjustByteSize(t, 1); err != nil {
return err
}

default:
log.Ctx(ctx).Fatal().Msg("unknown schema definition kind")
}

return nil
}

// AsRevisionChanges returns the list of changes processed so far as a datastore watch
Expand Down
85 changes: 72 additions & 13 deletions internal/datastore/common/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestChanges(t *testing.T) {
require := require.New(t)

ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
for _, step := range tc.script {
if step.relationship != "" {
rel := tuple.MustParse(step.relationship)
Expand All @@ -315,15 +315,18 @@ func TestChanges(t *testing.T) {
}

for _, changed := range step.changedDefinitions {
ch.AddChangedDefinition(ctx, revisions.NewForTransactionID(step.revision), changed)
err := ch.AddChangedDefinition(ctx, revisions.NewForTransactionID(step.revision), changed)
require.NoError(err)
}

for _, ns := range step.deletedNamespaces {
ch.AddDeletedNamespace(ctx, revisions.NewForTransactionID(step.revision), ns)
err := ch.AddDeletedNamespace(ctx, revisions.NewForTransactionID(step.revision), ns)
require.NoError(err)
}

for _, c := range step.deletedCaveats {
ch.AddDeletedCaveat(ctx, revisions.NewForTransactionID(step.revision), c)
err := ch.AddDeletedCaveat(ctx, revisions.NewForTransactionID(step.revision), c)
require.NoError(err)
}
}

Expand All @@ -337,7 +340,7 @@ func TestChanges(t *testing.T) {

func TestFilteredSchemaChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

require.NoError(t, ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:firstdoc#viewer@user:tom"), core.RelationTupleUpdate_TOUCH))
Expand All @@ -346,22 +349,28 @@ func TestFilteredSchemaChanges(t *testing.T) {

func TestFilteredRelationshipChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships, 0)
require.True(t, ch.IsEmpty())

ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
err := ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
require.NoError(t, err)
require.True(t, ch.IsEmpty())
}

func TestFilterAndRemoveRevisionChanges(t *testing.T) {
ctx := context.Background()
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)

require.True(t, ch.IsEmpty())

ch.AddDeletedNamespace(ctx, rev1, "deletedns1")
ch.AddDeletedNamespace(ctx, rev2, "deletedns2")
ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
err := ch.AddDeletedNamespace(ctx, rev1, "deletedns1")
require.NoError(t, err)

err = ch.AddDeletedNamespace(ctx, rev2, "deletedns2")
require.NoError(t, err)

err = ch.AddDeletedNamespace(ctx, rev3, "deletedns3")
require.NoError(t, err)

require.False(t, ch.IsEmpty())

Expand Down Expand Up @@ -408,7 +417,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
func TestHLCOrdering(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

rev1, err := revisions.HLCRevisionFromString("1.0000000001")
Expand Down Expand Up @@ -451,7 +460,7 @@ func TestHLCOrdering(t *testing.T) {
func TestHLCSameRevision(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema)
ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
Expand Down Expand Up @@ -496,6 +505,56 @@ func TestHLCSameRevision(t *testing.T) {
}, remaining)
}

func TestMaximumSize(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 150)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
require.NoError(t, err)

rev1, err := revisions.HLCRevisionFromString("2")
require.NoError(t, err)

rev2, err := revisions.HLCRevisionFromString("3")
require.NoError(t, err)

rev3, err := revisions.HLCRevisionFromString("4")
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev1, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev2, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev3, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.Error(t, err)
require.ErrorContains(t, err, "maximum changes byte size of 150 exceeded")
}

func TestMaximumSizeReplacement(t *testing.T) {
ctx := context.Background()

ch := NewChanges(revisions.HLCKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 43)
require.True(t, ch.IsEmpty())

rev0, err := revisions.HLCRevisionFromString("1")
require.NoError(t, err)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
require.NoError(t, err)
require.Equal(t, int64(43), ch.currentByteSize)

err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_DELETE)
require.NoError(t, err)
require.Equal(t, int64(43), ch.currentByteSize)
}

func TestCanonicalize(t *testing.T) {
testCases := []struct {
name string
Expand Down
11 changes: 11 additions & 0 deletions internal/datastore/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,14 @@ type RevisionUnavailableError struct {
func NewRevisionUnavailableError(err error) error {
return RevisionUnavailableError{err}
}

// MaximumChangesSizeExceededError is returned when the maximum size of changes is exceeded.
type MaximumChangesSizeExceededError struct {
error
maxSize uint64
}

// NewMaximumChangesSizeExceededError creates a new MaximumChangesSizeExceededError.
func NewMaximumChangesSizeExceededError(maxSize uint64) error {
return MaximumChangesSizeExceededError{fmt.Errorf("maximum changes byte size of %d exceeded", maxSize), maxSize}
}
Loading
Loading