From 031f8b267df44ff6fd28ba969a588f2aa15571d3 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 31 Aug 2023 16:26:32 -0400 Subject: [PATCH] Fixes for the watching schema cache 1) Make sure to use the *caching* datastore proxy as the fallback, as opposed to always reading 2) Prepopulate the definition caches so that the watching cache can function on existing schema without a WriteSchema call being necessary 3) Add tests for the above and fix a discovered issue in the existing caching proxy around error type checking --- .../datastore/proxy/schemacaching/caching.go | 2 +- .../proxy/schemacaching/standardcache.go | 3 +- .../proxy/schemacaching/watchingcache.go | 93 ++++++++-- .../proxy/schemacaching/watchingcache_test.go | 163 ++++++++++++++---- 4 files changed, 206 insertions(+), 55 deletions(-) diff --git a/internal/datastore/proxy/schemacaching/caching.go b/internal/datastore/proxy/schemacaching/caching.go index 1480e797b1..3ed1fc693b 100644 --- a/internal/datastore/proxy/schemacaching/caching.go +++ b/internal/datastore/proxy/schemacaching/caching.go @@ -69,5 +69,5 @@ func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache, gcWin } } - return createWatchingCacheProxy(watchable, gcWindow) + return createWatchingCacheProxy(watchable, c, gcWindow) } diff --git a/internal/datastore/proxy/schemacaching/standardcache.go b/internal/datastore/proxy/schemacaching/standardcache.go index 8ef987d0bd..eb52ed9bb5 100644 --- a/internal/datastore/proxy/schemacaching/standardcache.go +++ b/internal/datastore/proxy/schemacaching/standardcache.go @@ -175,7 +175,7 @@ func readAndCache[T schemaDefinition]( // sever the context so that another branch doesn't cancel the // single-flighted read loaded, updatedRev, err := reader(internaldatastore.SeparateContextWithTracing(ctx), name) - if err != nil && !errors.Is(err, &datastore.ErrNamespaceNotFound{}) && !errors.Is(err, &datastore.ErrCaveatNameNotFound{}) { + if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) && !errors.As(err, &datastore.ErrCaveatNameNotFound{}) { // Propagate this error to the caller return nil, err } @@ -187,7 +187,6 @@ func readAndCache[T schemaDefinition]( // We have to call wait here or else Ristretto may not have the key // available to a subsequent caller. r.p.c.Wait() - return entry, nil }) if err != nil { diff --git a/internal/datastore/proxy/schemacaching/watchingcache.go b/internal/datastore/proxy/schemacaching/watchingcache.go index b199665e1b..ecc088c8f6 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache.go +++ b/internal/datastore/proxy/schemacaching/watchingcache.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog/log" + "github.com/authzed/spicedb/pkg/cache" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" "github.com/authzed/spicedb/pkg/datastore/revision" @@ -62,17 +63,24 @@ func init() { type watchingCachingProxy struct { datastore.SchemaWatchableDatastore - gcWindow time.Duration - closed chan bool + fallbackCache *definitionCachingProxy + gcWindow time.Duration + closed chan bool namespaceCache *schemaWatchCache[*core.NamespaceDefinition] caveatCache *schemaWatchCache[*core.CaveatDefinition] } // createWatchingCacheProxy creates and returns a watching cache proxy. -func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWindow time.Duration) *watchingCachingProxy { +func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, c cache.Cache, gcWindow time.Duration) *watchingCachingProxy { + fallbackCache := &definitionCachingProxy{ + Datastore: wrapped, + c: c, + } + proxy := &watchingCachingProxy{ SchemaWatchableDatastore: wrapped, + fallbackCache: fallbackCache, gcWindow: gcWindow, closed: make(chan bool, 2), @@ -81,10 +89,10 @@ func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWind "namespace", datastore.NewNamespaceNotFoundErr, func(ctx context.Context, name string, revision datastore.Revision) (*core.NamespaceDefinition, datastore.Revision, error) { - return wrapped.SnapshotReader(revision).ReadNamespaceByName(ctx, name) + return fallbackCache.SnapshotReader(revision).ReadNamespaceByName(ctx, name) }, func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.NamespaceDefinition], error) { - return wrapped.SnapshotReader(revision).LookupNamespacesWithNames(ctx, names) + return fallbackCache.SnapshotReader(revision).LookupNamespacesWithNames(ctx, names) }, definitionsReadCachedCounter, definitionsReadTotalCounter, @@ -94,10 +102,10 @@ func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWind "caveat", datastore.NewCaveatNameNotFoundErr, func(ctx context.Context, name string, revision datastore.Revision) (*core.CaveatDefinition, datastore.Revision, error) { - return wrapped.SnapshotReader(revision).ReadCaveatByName(ctx, name) + return fallbackCache.SnapshotReader(revision).ReadCaveatByName(ctx, name) }, func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.CaveatDefinition], error) { - return wrapped.SnapshotReader(revision).LookupCaveatsWithNames(ctx, names) + return fallbackCache.SnapshotReader(revision).LookupCaveatsWithNames(ctx, names) }, definitionsReadCachedCounter, definitionsReadTotalCounter, @@ -117,17 +125,22 @@ func (p *watchingCachingProxy) ReadWriteTx( f datastore.TxUserFunc, opts ...options.RWTOptionsOption, ) (datastore.Revision, error) { - return p.SchemaWatchableDatastore.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error { - // NOTE: we always use the standard approach cache here, as it stores changes within the transaction - // itself, and should not impact the overall updating cache. - rwt := &definitionCachingRWT{delegateRWT, &sync.Map{}} - return f(rwt) - }, opts...) + // NOTE: we always use the standard approach cache here, as it stores changes within the transaction + // itself, and should not impact the overall updating cache. + return p.fallbackCache.ReadWriteTx(ctx, f, opts...) } func (p *watchingCachingProxy) Start(ctx context.Context) error { - log.Info().Msg("starting watching cache") + // Start async so that prepopulating doesn't block the server start. + go func() { + _ = p.startSync(ctx) + }() + + return nil +} +func (p *watchingCachingProxy) startSync(ctx context.Context) error { + log.Info().Msg("starting watching cache") headRev, err := p.SchemaWatchableDatastore.HeadRevision(context.Background()) if err != nil { p.namespaceCache.setFallbackMode() @@ -165,14 +178,60 @@ func (p *watchingCachingProxy) Start(ctx context.Context) error { // Start watching for schema changes. go (func() { log.Debug().Str("revision", headRev.String()).Msg("starting watching cache watch goroutine") + reader := p.SchemaWatchableDatastore.SnapshotReader(headRev) - p.namespaceCache.startAtRevision(headRev) - p.caveatCache.startAtRevision(headRev) + // Populate the cache with all definitions at the head revision. + log.Info().Str("revision", headRev.String()).Msg("prepopulating namespace watching cache") + namespaces, err := reader.ListAllNamespaces(ctx) + if err != nil { + p.namespaceCache.setFallbackMode() + p.caveatCache.setFallbackMode() + log.Warn().Err(err).Msg("received error in schema watch") + wg.Done() + return + } + + for _, namespaceDef := range namespaces { + err := p.namespaceCache.updateDefinition(namespaceDef.Definition.Name, namespaceDef.Definition, false, headRev) + if err != nil { + p.namespaceCache.setFallbackMode() + p.caveatCache.setFallbackMode() + log.Warn().Err(err).Msg("received error in schema watch") + wg.Done() + return + } + } + log.Info().Str("revision", headRev.String()).Int("count", len(namespaces)).Msg("populated namespace watching cache") + + log.Info().Str("revision", headRev.String()).Msg("prepopulating caveat watching cache") + caveats, err := reader.ListAllCaveats(ctx) + if err != nil { + p.namespaceCache.setFallbackMode() + p.caveatCache.setFallbackMode() + log.Warn().Err(err).Msg("received error in schema watch") + wg.Done() + return + } + + for _, caveatDef := range caveats { + err := p.caveatCache.updateDefinition(caveatDef.Definition.Name, caveatDef.Definition, false, headRev) + if err != nil { + p.namespaceCache.setFallbackMode() + p.caveatCache.setFallbackMode() + log.Warn().Err(err).Msg("received error in schema watch") + wg.Done() + return + } + } + log.Info().Str("revision", headRev.String()).Int("count", len(caveats)).Msg("populated caveat watching cache") log.Debug().Str("revision", headRev.String()).Msg("beginning schema watch") ssc, serrc := p.SchemaWatchableDatastore.WatchSchema(context.Background(), headRev) log.Debug().Msg("schema watch started") + p.namespaceCache.startAtRevision(headRev) + p.caveatCache.startAtRevision(headRev) + wg.Done() for { @@ -262,6 +321,7 @@ func (p *watchingCachingProxy) Close() error { p.closed <- true p.closed <- true + p.fallbackCache.Close() return p.SchemaWatchableDatastore.Close() } @@ -352,7 +412,6 @@ func (swc *schemaWatchCache[T]) startAtRevision(revision datastore.Revision) { swc.lock.Lock() defer swc.lock.Unlock() - swc.entries = map[string]*intervalTracker[revisionedEntry[T]]{} swc.checkpointRevision = revision swc.inFallbackMode = false diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index b2bb553a9f..c5da6383aa 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -11,6 +11,7 @@ import ( "go.uber.org/goleak" "golang.org/x/exp/slices" + "github.com/authzed/spicedb/pkg/cache" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -34,16 +35,16 @@ func TestWatchingCacheBasicOperation(t *testing.T) { errChan: make(chan error, 1), } - cache := createWatchingCacheProxy(fakeDS, 1*time.Hour) - require.NoError(t, cache.Start(context.Background())) + wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour) + require.NoError(t, wcache.startSync(context.Background())) // Ensure no namespaces are found. - _, _, err := cache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") + _, _, err := wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) - require.False(t, cache.namespaceCache.inFallbackMode) + require.False(t, wcache.namespaceCache.inFallbackMode) // Ensure a re-read also returns not found, even before a checkpoint is received. - _, _, err = cache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") + _, _, err = wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) // Send a checkpoint for revision 1. @@ -53,7 +54,7 @@ func TestWatchingCacheBasicOperation(t *testing.T) { fakeDS.updateNamespace("somenamespace", &corev1.NamespaceDefinition{Name: "somenamespace"}, rev("2")) // Ensure that reading at rev 2 returns found. - nsDef, _, err := cache.SnapshotReader(rev("2")).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err := wcache.SnapshotReader(rev("2")).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) @@ -61,7 +62,7 @@ func TestWatchingCacheBasicOperation(t *testing.T) { fakeDS.disableReads() // Ensure that reading at rev 3 returns an error, as with reads disabled the cache should not be hit. - _, _, err = cache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "somenamespace") + _, _, err = wcache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "somenamespace") require.Error(t, err) require.ErrorContains(t, err, "reads are disabled") @@ -70,24 +71,24 @@ func TestWatchingCacheBasicOperation(t *testing.T) { // Ensure that reading at rev 3 returns found, even though the cache should not yet be there. This will // require a datastore fallback read because the cache is not yet checkedpointed to that revision. - nsDef, _, err = cache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err = wcache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) // Checkpoint to rev 4. fakeDS.sendCheckpoint(rev("4")) - require.False(t, cache.namespaceCache.inFallbackMode) + require.False(t, wcache.namespaceCache.inFallbackMode) // Disable reads. fakeDS.disableReads() // Read again, which should now be via the cache. - nsDef, _, err = cache.SnapshotReader(rev("3.5")).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err = wcache.SnapshotReader(rev("3.5")).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) // Read via a lookup. - nsDefs, err := cache.SnapshotReader(rev("3.5")).LookupNamespacesWithNames(context.Background(), []string{"somenamespace"}) + nsDefs, err := wcache.SnapshotReader(rev("3.5")).LookupNamespacesWithNames(context.Background(), []string{"somenamespace"}) require.NoError(t, err) require.Equal(t, "somenamespace", nsDefs[0].Definition.Name) @@ -95,17 +96,17 @@ func TestWatchingCacheBasicOperation(t *testing.T) { fakeDS.updateNamespace("somenamespace", nil, rev("5")) // Re-read at an earlier revision. - nsDef, _, err = cache.SnapshotReader(rev("3.5")).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err = wcache.SnapshotReader(rev("3.5")).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) // Read at revision 5. - _, _, err = cache.SnapshotReader(rev("5")).ReadNamespaceByName(context.Background(), "somenamespace") + _, _, err = wcache.SnapshotReader(rev("5")).ReadNamespaceByName(context.Background(), "somenamespace") require.Error(t, err) require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}, "missing not found in: %v", err) // Lookup at revision 5. - nsDefs, err = cache.SnapshotReader(rev("5")).LookupNamespacesWithNames(context.Background(), []string{"somenamespace"}) + nsDefs, err = wcache.SnapshotReader(rev("5")).LookupNamespacesWithNames(context.Background(), []string{"somenamespace"}) require.NoError(t, err) require.Empty(t, nsDefs) @@ -113,16 +114,16 @@ func TestWatchingCacheBasicOperation(t *testing.T) { fakeDS.updateCaveat("somecaveat", &corev1.CaveatDefinition{Name: "somecaveat"}, rev("6")) // Read at revision 6. - caveatDef, _, err := cache.SnapshotReader(rev("6")).ReadCaveatByName(context.Background(), "somecaveat") + caveatDef, _, err := wcache.SnapshotReader(rev("6")).ReadCaveatByName(context.Background(), "somecaveat") require.NoError(t, err) require.Equal(t, "somecaveat", caveatDef.Name) // Attempt to read at revision 1, which should require a read. - _, _, err = cache.SnapshotReader(rev("1")).ReadCaveatByName(context.Background(), "somecaveat") + _, _, err = wcache.SnapshotReader(rev("1")).ReadCaveatByName(context.Background(), "somecaveat") require.ErrorContains(t, err, "reads are disabled") // Close the proxy and ensure the background goroutines are terminated. - cache.Close() + wcache.Close() time.Sleep(10 * time.Millisecond) } @@ -137,8 +138,8 @@ func TestWatchingCacheParallelOperations(t *testing.T) { errChan: make(chan error, 1), } - cache := createWatchingCacheProxy(fakeDS, 1*time.Hour) - require.NoError(t, cache.Start(context.Background())) + wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour) + require.NoError(t, wcache.startSync(context.Background())) // Run some operations in parallel. var wg sync.WaitGroup @@ -146,15 +147,15 @@ func TestWatchingCacheParallelOperations(t *testing.T) { go (func() { // Read somenamespace (which should not be found) - _, _, err := cache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") + _, _, err := wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) - require.False(t, cache.namespaceCache.inFallbackMode) + require.False(t, wcache.namespaceCache.inFallbackMode) // Write somenamespace. fakeDS.updateNamespace("somenamespace", &corev1.NamespaceDefinition{Name: "somenamespace"}, rev("2")) // Read again (which should be found now) - nsDef, _, err := cache.SnapshotReader(rev("2")).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err := wcache.SnapshotReader(rev("2")).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) @@ -163,14 +164,14 @@ func TestWatchingCacheParallelOperations(t *testing.T) { go (func() { // Read anothernamespace (which should not be found) - _, _, err := cache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "anothernamespace") + _, _, err := wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "anothernamespace") require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) - require.False(t, cache.namespaceCache.inFallbackMode) + require.False(t, wcache.namespaceCache.inFallbackMode) // Read again (which should still not be found) - _, _, err = cache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "anothernamespace") + _, _, err = wcache.SnapshotReader(rev("3")).ReadNamespaceByName(context.Background(), "anothernamespace") require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) - require.False(t, cache.namespaceCache.inFallbackMode) + require.False(t, wcache.namespaceCache.inFallbackMode) wg.Done() })() @@ -178,7 +179,7 @@ func TestWatchingCacheParallelOperations(t *testing.T) { wg.Wait() // Close the proxy and ensure the background goroutines are terminated. - cache.Close() + wcache.Close() time.Sleep(10 * time.Millisecond) } @@ -193,8 +194,8 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) { errChan: make(chan error, 1), } - cache := createWatchingCacheProxy(fakeDS, 1*time.Hour) - require.NoError(t, cache.Start(context.Background())) + wcache := createWatchingCacheProxy(fakeDS, cache.NoopCache(), 1*time.Hour) + require.NoError(t, wcache.startSync(context.Background())) // Write somenamespace. fakeDS.updateNamespace("somenamespace", &corev1.NamespaceDefinition{Name: "somenamespace"}, rev("0")) @@ -219,7 +220,7 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) { headRevision, err := fakeDS.HeadRevision(context.Background()) require.NoError(t, err) - nsDef, _, err := cache.SnapshotReader(headRevision).ReadNamespaceByName(context.Background(), "somenamespace") + nsDef, _, err := wcache.SnapshotReader(headRevision).ReadNamespaceByName(context.Background(), "somenamespace") require.NoError(t, err) require.Equal(t, "somenamespace", nsDef.Name) } @@ -230,7 +231,94 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) { wg.Wait() // Close the proxy and ensure the background goroutines are terminated. - cache.Close() + wcache.Close() + time.Sleep(10 * time.Millisecond) +} + +func TestWatchingCacheFallbackToStandardCache(t *testing.T) { + defer goleak.VerifyNone(t, goleakIgnores...) + + fakeDS := &fakeDatastore{ + headRevision: rev("0"), + namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, + caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, + schemaChan: make(chan *datastore.SchemaState, 1), + errChan: make(chan error, 1), + } + + c, err := cache.NewCache(&cache.Config{ + NumCounters: 1000, + MaxCost: 1000, + DefaultTTL: 1000 * time.Second, + }) + require.NoError(t, err) + + wcache := createWatchingCacheProxy(fakeDS, c, 1*time.Hour) + require.NoError(t, wcache.startSync(context.Background())) + + // Ensure the namespace is not found, but is cached in the fallback caching layer. + _, _, err = wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") + require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) + require.False(t, wcache.namespaceCache.inFallbackMode) + + entry, ok := c.Get("n:somenamespace@1") + require.True(t, ok) + require.NotNil(t, entry.(*cacheEntry).notFound) + + // Disable reading and ensure it still works, via the fallback cache. + fakeDS.readsDisabled = true + + _, _, err = wcache.SnapshotReader(rev("1")).ReadNamespaceByName(context.Background(), "somenamespace") + require.ErrorAs(t, err, &datastore.ErrNamespaceNotFound{}) + require.False(t, wcache.namespaceCache.inFallbackMode) + + // Close the proxy and ensure the background goroutines are terminated. + wcache.Close() + time.Sleep(10 * time.Millisecond) +} + +func TestWatchingCachePrepopulated(t *testing.T) { + defer goleak.VerifyNone(t, goleakIgnores...) + + fakeDS := &fakeDatastore{ + headRevision: rev("4"), + namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, + caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, + schemaChan: make(chan *datastore.SchemaState, 1), + errChan: make(chan error, 1), + existingNamespaces: []datastore.RevisionedNamespace{ + datastore.RevisionedDefinition[*corev1.NamespaceDefinition]{ + Definition: &corev1.NamespaceDefinition{ + Name: "somenamespace", + }, + LastWrittenRevision: rev("1"), + }, + datastore.RevisionedDefinition[*corev1.NamespaceDefinition]{ + Definition: &corev1.NamespaceDefinition{ + Name: "anothernamespace", + }, + LastWrittenRevision: rev("2"), + }, + }, + } + + c, err := cache.NewCache(&cache.Config{ + NumCounters: 1000, + MaxCost: 1000, + DefaultTTL: 1000 * time.Second, + }) + require.NoError(t, err) + + wcache := createWatchingCacheProxy(fakeDS, c, 1*time.Hour) + require.NoError(t, wcache.startSync(context.Background())) + + // Ensure the namespace is found. + def, _, err := wcache.SnapshotReader(rev("4")).ReadNamespaceByName(context.Background(), "somenamespace") + require.NoError(t, err) + require.Equal(t, "somenamespace", def.Name) + + // Close the proxy and ensure the background goroutines are terminated. + wcache.Close() time.Sleep(10 * time.Millisecond) } @@ -243,7 +331,8 @@ type fakeDatastore struct { schemaChan chan *datastore.SchemaState errChan chan error - readsDisabled bool + readsDisabled bool + existingNamespaces []datastore.RevisionedNamespace lock sync.RWMutex } @@ -483,11 +572,15 @@ func (fsr *fakeSnapshotReader) ReadCaveatByName(_ context.Context, name string) } func (*fakeSnapshotReader) ListAllCaveats(context.Context) ([]datastore.RevisionedDefinition[*corev1.CaveatDefinition], error) { - return nil, fmt.Errorf("not implemented") + return []datastore.RevisionedDefinition[*corev1.CaveatDefinition]{}, nil } -func (*fakeSnapshotReader) ListAllNamespaces(context.Context) ([]datastore.RevisionedDefinition[*corev1.NamespaceDefinition], error) { - return nil, fmt.Errorf("not implemented") +func (fsr *fakeSnapshotReader) ListAllNamespaces(context.Context) ([]datastore.RevisionedDefinition[*corev1.NamespaceDefinition], error) { + if fsr.fds.existingNamespaces != nil { + return fsr.fds.existingNamespaces, nil + } + + return []datastore.RevisionedDefinition[*corev1.NamespaceDefinition]{}, nil } func (*fakeSnapshotReader) QueryRelationships(context.Context, datastore.RelationshipsFilter, ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) {