Skip to content

Commit

Permalink
Merge pull request #1519 from josephschorr/schema-watch-fixes
Browse files Browse the repository at this point in the history
Fixes for the watching schema cache
  • Loading branch information
josephschorr authored Sep 1, 2023
2 parents 867f309 + 031f8b2 commit 9c446e5
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 55 deletions.
2 changes: 1 addition & 1 deletion internal/datastore/proxy/schemacaching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@ func NewCachingDatastoreProxy(delegate datastore.Datastore, c cache.Cache, gcWin
}
}

return createWatchingCacheProxy(watchable, gcWindow)
return createWatchingCacheProxy(watchable, c, gcWindow)
}
3 changes: 1 addition & 2 deletions internal/datastore/proxy/schemacaching/standardcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func readAndCache[T schemaDefinition](
// sever the context so that another branch doesn't cancel the
// single-flighted read
loaded, updatedRev, err := reader(internaldatastore.SeparateContextWithTracing(ctx), name)
if err != nil && !errors.Is(err, &datastore.ErrNamespaceNotFound{}) && !errors.Is(err, &datastore.ErrCaveatNameNotFound{}) {
if err != nil && !errors.As(err, &datastore.ErrNamespaceNotFound{}) && !errors.As(err, &datastore.ErrCaveatNameNotFound{}) {
// Propagate this error to the caller
return nil, err
}
Expand All @@ -187,7 +187,6 @@ func readAndCache[T schemaDefinition](
// We have to call wait here or else Ristretto may not have the key
// available to a subsequent caller.
r.p.c.Wait()

return entry, nil
})
if err != nil {
Expand Down
93 changes: 76 additions & 17 deletions internal/datastore/proxy/schemacaching/watchingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"

"github.com/authzed/spicedb/pkg/cache"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/datastore/revision"
Expand Down Expand Up @@ -62,17 +63,24 @@ func init() {
type watchingCachingProxy struct {
datastore.SchemaWatchableDatastore

gcWindow time.Duration
closed chan bool
fallbackCache *definitionCachingProxy
gcWindow time.Duration
closed chan bool

namespaceCache *schemaWatchCache[*core.NamespaceDefinition]
caveatCache *schemaWatchCache[*core.CaveatDefinition]
}

// createWatchingCacheProxy creates and returns a watching cache proxy.
func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWindow time.Duration) *watchingCachingProxy {
func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, c cache.Cache, gcWindow time.Duration) *watchingCachingProxy {
fallbackCache := &definitionCachingProxy{
Datastore: wrapped,
c: c,
}

proxy := &watchingCachingProxy{
SchemaWatchableDatastore: wrapped,
fallbackCache: fallbackCache,

gcWindow: gcWindow,
closed: make(chan bool, 2),
Expand All @@ -81,10 +89,10 @@ func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWind
"namespace",
datastore.NewNamespaceNotFoundErr,
func(ctx context.Context, name string, revision datastore.Revision) (*core.NamespaceDefinition, datastore.Revision, error) {
return wrapped.SnapshotReader(revision).ReadNamespaceByName(ctx, name)
return fallbackCache.SnapshotReader(revision).ReadNamespaceByName(ctx, name)
},
func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.NamespaceDefinition], error) {
return wrapped.SnapshotReader(revision).LookupNamespacesWithNames(ctx, names)
return fallbackCache.SnapshotReader(revision).LookupNamespacesWithNames(ctx, names)
},
definitionsReadCachedCounter,
definitionsReadTotalCounter,
Expand All @@ -94,10 +102,10 @@ func createWatchingCacheProxy(wrapped datastore.SchemaWatchableDatastore, gcWind
"caveat",
datastore.NewCaveatNameNotFoundErr,
func(ctx context.Context, name string, revision datastore.Revision) (*core.CaveatDefinition, datastore.Revision, error) {
return wrapped.SnapshotReader(revision).ReadCaveatByName(ctx, name)
return fallbackCache.SnapshotReader(revision).ReadCaveatByName(ctx, name)
},
func(ctx context.Context, names []string, revision datastore.Revision) ([]datastore.RevisionedDefinition[*core.CaveatDefinition], error) {
return wrapped.SnapshotReader(revision).LookupCaveatsWithNames(ctx, names)
return fallbackCache.SnapshotReader(revision).LookupCaveatsWithNames(ctx, names)
},
definitionsReadCachedCounter,
definitionsReadTotalCounter,
Expand All @@ -117,17 +125,22 @@ func (p *watchingCachingProxy) ReadWriteTx(
f datastore.TxUserFunc,
opts ...options.RWTOptionsOption,
) (datastore.Revision, error) {
return p.SchemaWatchableDatastore.ReadWriteTx(ctx, func(delegateRWT datastore.ReadWriteTransaction) error {
// NOTE: we always use the standard approach cache here, as it stores changes within the transaction
// itself, and should not impact the overall updating cache.
rwt := &definitionCachingRWT{delegateRWT, &sync.Map{}}
return f(rwt)
}, opts...)
// NOTE: we always use the standard approach cache here, as it stores changes within the transaction
// itself, and should not impact the overall updating cache.
return p.fallbackCache.ReadWriteTx(ctx, f, opts...)
}

func (p *watchingCachingProxy) Start(ctx context.Context) error {
log.Info().Msg("starting watching cache")
// Start async so that prepopulating doesn't block the server start.
go func() {
_ = p.startSync(ctx)
}()

return nil
}

func (p *watchingCachingProxy) startSync(ctx context.Context) error {
log.Info().Msg("starting watching cache")
headRev, err := p.SchemaWatchableDatastore.HeadRevision(context.Background())
if err != nil {
p.namespaceCache.setFallbackMode()
Expand Down Expand Up @@ -165,14 +178,60 @@ func (p *watchingCachingProxy) Start(ctx context.Context) error {
// Start watching for schema changes.
go (func() {
log.Debug().Str("revision", headRev.String()).Msg("starting watching cache watch goroutine")
reader := p.SchemaWatchableDatastore.SnapshotReader(headRev)

p.namespaceCache.startAtRevision(headRev)
p.caveatCache.startAtRevision(headRev)
// Populate the cache with all definitions at the head revision.
log.Info().Str("revision", headRev.String()).Msg("prepopulating namespace watching cache")
namespaces, err := reader.ListAllNamespaces(ctx)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}

for _, namespaceDef := range namespaces {
err := p.namespaceCache.updateDefinition(namespaceDef.Definition.Name, namespaceDef.Definition, false, headRev)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
}
log.Info().Str("revision", headRev.String()).Int("count", len(namespaces)).Msg("populated namespace watching cache")

log.Info().Str("revision", headRev.String()).Msg("prepopulating caveat watching cache")
caveats, err := reader.ListAllCaveats(ctx)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}

for _, caveatDef := range caveats {
err := p.caveatCache.updateDefinition(caveatDef.Definition.Name, caveatDef.Definition, false, headRev)
if err != nil {
p.namespaceCache.setFallbackMode()
p.caveatCache.setFallbackMode()
log.Warn().Err(err).Msg("received error in schema watch")
wg.Done()
return
}
}
log.Info().Str("revision", headRev.String()).Int("count", len(caveats)).Msg("populated caveat watching cache")

log.Debug().Str("revision", headRev.String()).Msg("beginning schema watch")
ssc, serrc := p.SchemaWatchableDatastore.WatchSchema(context.Background(), headRev)
log.Debug().Msg("schema watch started")

p.namespaceCache.startAtRevision(headRev)
p.caveatCache.startAtRevision(headRev)

wg.Done()

for {
Expand Down Expand Up @@ -262,6 +321,7 @@ func (p *watchingCachingProxy) Close() error {
p.closed <- true
p.closed <- true

p.fallbackCache.Close()
return p.SchemaWatchableDatastore.Close()
}

Expand Down Expand Up @@ -352,7 +412,6 @@ func (swc *schemaWatchCache[T]) startAtRevision(revision datastore.Revision) {
swc.lock.Lock()
defer swc.lock.Unlock()

swc.entries = map[string]*intervalTracker[revisionedEntry[T]]{}
swc.checkpointRevision = revision
swc.inFallbackMode = false

Expand Down
Loading

0 comments on commit 9c446e5

Please sign in to comment.