From f261a3419abad2475a0b17c7ecbe20cc3c838158 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 11 Jul 2024 04:23:08 +0100 Subject: [PATCH] add support for configurable dispatch chunk size we've observed improvements on LR workloads when increasing the dispatch chunk size --- internal/caveats/run.go | 10 +---- internal/datastore/common/sql.go | 12 +++--- internal/datastore/common/sql_test.go | 4 +- internal/datastore/crdb/crdb.go | 10 +++-- internal/datastore/crdb/options.go | 11 +++++- internal/datastore/crdb/reader.go | 17 +++++---- internal/datastore/mysql/datastore.go | 3 ++ internal/datastore/mysql/options.go | 8 ++++ internal/datastore/mysql/reader.go | 13 ++++--- internal/datastore/postgres/options.go | 8 ++++ internal/datastore/postgres/postgres.go | 12 ++++-- internal/datastore/postgres/reader.go | 13 ++++--- internal/datastore/spanner/options.go | 6 +++ internal/datastore/spanner/reader.go | 11 +++--- internal/datastore/spanner/spanner.go | 5 ++- internal/dispatch/cluster/cluster.go | 10 ++++- internal/dispatch/combined/combined.go | 10 ++++- internal/dispatch/graph/check_test.go | 14 +++---- internal/dispatch/graph/expand_test.go | 2 +- internal/dispatch/graph/graph.go | 28 +++++++------- .../dispatch/graph/lookupresources2_test.go | 10 ++--- .../dispatch/graph/lookupresources_test.go | 8 ++-- .../dispatch/graph/lookupsubjects_test.go | 4 +- .../dispatch/graph/reachableresources_test.go | 20 +++++----- internal/graph/check.go | 37 ++++++++++--------- internal/graph/checkingresourcestream.go | 21 +++++++---- internal/graph/computed/computecheck.go | 9 +++-- internal/graph/computed/computecheck_test.go | 9 +++-- internal/graph/graph.go | 21 ----------- internal/graph/graph_test.go | 22 ----------- internal/graph/lookupresources.go | 13 ++++--- internal/graph/lookupresources2.go | 20 +++++----- internal/graph/lookupsubjects.go | 11 +++--- internal/graph/lr2streams.go | 8 ++-- internal/graph/reachableresources.go | 15 ++++---- .../services/integrationtesting/cert_test.go | 2 +- .../integrationtesting/consistency_test.go | 16 ++++---- .../consistencytestutil/accessibilityset.go | 3 +- .../consistencytestutil/clusteranddata.go | 4 +- internal/services/v1/bulkcheck.go | 7 ++-- internal/services/v1/experimental.go | 2 +- internal/services/v1/experimental_test.go | 18 +++++---- internal/services/v1/grouping.go | 2 - internal/services/v1/permissions.go | 1 + internal/services/v1/permissions_test.go | 16 ++++---- internal/services/v1/relationships.go | 5 +++ internal/testserver/server.go | 2 +- pkg/cmd/datastore/datastore.go | 6 +++ pkg/cmd/datastore/zz_generated.options.go | 8 ++++ pkg/cmd/serve.go | 1 + pkg/cmd/server/server.go | 9 ++++- pkg/cmd/server/zz_generated.options.go | 9 +++++ pkg/cmd/testserver/testserver.go | 9 ++++- pkg/datastore/datastore.go | 4 -- pkg/development/check.go | 3 ++ pkg/development/devcontext.go | 2 +- 56 files changed, 319 insertions(+), 245 deletions(-) delete mode 100644 internal/graph/graph_test.go diff --git a/internal/caveats/run.go b/internal/caveats/run.go index f57e29f960..3481bd85c0 100644 --- a/internal/caveats/run.go +++ b/internal/caveats/run.go @@ -221,19 +221,11 @@ func runExpressionWithCaveats( var exprStringPieces []string var currentResult ExpressionResult = syntheticResult{ - value: false, + value: cop.Op == core.CaveatOperation_AND, contextValues: map[string]any{}, exprString: "", missingContextParams: []string{}, } - if cop.Op == core.CaveatOperation_AND { - currentResult = syntheticResult{ - value: true, - contextValues: map[string]any{}, - exprString: "", - missingContextParams: []string{}, - } - } buildExprString := func() (string, error) { if debugOption != RunCaveatExpressionWithDebugInformation { diff --git a/internal/datastore/common/sql.go b/internal/datastore/common/sql.go index fa151a546a..c3530da7d3 100644 --- a/internal/datastore/common/sql.go +++ b/internal/datastore/common/sql.go @@ -102,14 +102,16 @@ type SchemaQueryFilterer struct { schema SchemaInformation queryBuilder sq.SelectBuilder filteringColumnCounts map[string]int + filterMaximumIDCount uint16 } // NewSchemaQueryFilterer creates a new SchemaQueryFilterer object. -func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder) SchemaQueryFilterer { +func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder, filterMaximumIDCount uint16) SchemaQueryFilterer { return SchemaQueryFilterer{ schema: schema, queryBuilder: initialQuery, filteringColumnCounts: map[string]int{}, + filterMaximumIDCount: filterMaximumIDCount, } } @@ -306,8 +308,8 @@ func (sqf SchemaQueryFilterer) MustFilterWithResourceIDPrefix(prefix string) Sch // FilterToResourceIDs returns a new SchemaQueryFilterer that is limited to resources with any of the // specified IDs. func (sqf SchemaQueryFilterer) FilterToResourceIDs(resourceIds []string) (SchemaQueryFilterer, error) { - if len(resourceIds) > int(datastore.FilterMaximumIDCount) { - return sqf, spiceerrors.MustBugf("cannot have more than %d resources IDs in a single filter", datastore.FilterMaximumIDCount) + if len(resourceIds) > int(sqf.filterMaximumIDCount) { + return sqf, spiceerrors.MustBugf("cannot have more than %d resources IDs in a single filter", sqf.filterMaximumIDCount) } var builder strings.Builder @@ -422,8 +424,8 @@ func (sqf SchemaQueryFilterer) FilterWithSubjectsSelectors(selectors ...datastor } if len(selector.OptionalSubjectIds) > 0 { - if len(selector.OptionalSubjectIds) > int(datastore.FilterMaximumIDCount) { - return sqf, spiceerrors.MustBugf("cannot have more than %d subject IDs in a single filter", datastore.FilterMaximumIDCount) + if len(selector.OptionalSubjectIds) > int(sqf.filterMaximumIDCount) { + return sqf, spiceerrors.MustBugf("cannot have more than %d subject IDs in a single filter", sqf.filterMaximumIDCount) } var builder strings.Builder diff --git a/internal/datastore/common/sql_test.go b/internal/datastore/common/sql_test.go index 1d84beb232..8edf06b9d8 100644 --- a/internal/datastore/common/sql_test.go +++ b/internal/datastore/common/sql_test.go @@ -669,7 +669,7 @@ func TestSchemaQueryFilterer(t *testing.T) { "caveat", TupleComparison, ) - filterer := NewSchemaQueryFilterer(schema, base) + filterer := NewSchemaQueryFilterer(schema, base, 100) ran := test.run(filterer) require.Equal(t, test.expectedColumnCounts, ran.filteringColumnCounts) @@ -693,7 +693,7 @@ func BenchmarkSchemaFilterer(b *testing.B) { "caveat_name", TupleComparison, ) - sqf := NewSchemaQueryFilterer(si, sq.Select("*")) + sqf := NewSchemaQueryFilterer(si, sq.Select("*"), 100) var names []string for i := 0; i < 500; i++ { names = append(names, uuid.NewString()) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 1c66f095f8..92b161f6b5 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -275,9 +275,10 @@ type crdbDatastore struct { featureGroup singleflight.Group[string, *datastore.Features] - pruneGroup *errgroup.Group - ctx context.Context - cancel context.CancelFunc + pruneGroup *errgroup.Group + ctx context.Context + cancel context.CancelFunc + filterMaximumIDCount uint16 } func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { @@ -289,7 +290,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade return query.From(fromStr + " AS OF SYSTEM TIME " + rev.String()) } - return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder} + return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount} } func (cds *crdbDatastore) ReadWriteTx( @@ -319,6 +320,7 @@ func (cds *crdbDatastore) ReadWriteTx( func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder { return query.From(fromStr) }, + cds.filterMaximumIDCount, }, tx, 0, diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 366b040733..16cc330997 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -22,8 +22,8 @@ type crdbOptions struct { overlapKey string enableConnectionBalancing bool analyzeBeforeStatistics bool - - enablePrometheusStats bool + filterMaximumIDCount uint16 + enablePrometheusStats bool } const ( @@ -48,6 +48,7 @@ const ( defaultEnablePrometheusStats = false defaultEnableConnectionBalancing = true defaultConnectRate = 100 * time.Millisecond + defaultFilterMaximumIDCount = 100 ) // Option provides the facility to configure how clients within the CRDB @@ -68,6 +69,7 @@ func generateConfig(options []Option) (crdbOptions, error) { enablePrometheusStats: defaultEnablePrometheusStats, enableConnectionBalancing: defaultEnableConnectionBalancing, connectRate: defaultConnectRate, + filterMaximumIDCount: defaultFilterMaximumIDCount, } for _, option := range options { @@ -306,3 +308,8 @@ func WithEnableConnectionBalancing(connectionBalancing bool) Option { func DebugAnalyzeBeforeStatistics() Option { return func(po *crdbOptions) { po.analyzeBeforeStatistics = true } } + +// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries +func FilterMaximumIDCount(filterMaximumIDCount uint16) Option { + return func(po *crdbOptions) { po.filterMaximumIDCount = filterMaximumIDCount } +} diff --git a/internal/datastore/crdb/reader.go b/internal/datastore/crdb/reader.go index fc9397af19..312f86e282 100644 --- a/internal/datastore/crdb/reader.go +++ b/internal/datastore/crdb/reader.go @@ -60,11 +60,12 @@ var ( ) type crdbReader struct { - query pgxcommon.DBFuncQuerier - executor common.QueryExecutor - keyer overlapKeyer - overlapKeySet keySet - fromBuilder func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder + query pgxcommon.DBFuncQuerier + executor common.QueryExecutor + keyer overlapKeyer + overlapKeySet keySet + fromBuilder func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder + filterMaximumIDCount uint16 } func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int, error) { @@ -83,7 +84,7 @@ func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int, } query := cr.fromBuilder(countTuples, tableTuple) - builder, err := common.NewSchemaQueryFilterer(schema, query).FilterWithRelationshipsFilter(relFilter) + builder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter) if err != nil { return 0, err } @@ -208,7 +209,7 @@ func (cr *crdbReader) QueryRelationships( opts ...options.QueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { query := cr.fromBuilder(queryTuples, tableTuple) - qBuilder, err := common.NewSchemaQueryFilterer(schema, query).FilterWithRelationshipsFilter(filter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount).FilterWithRelationshipsFilter(filter) if err != nil { return nil, err } @@ -222,7 +223,7 @@ func (cr *crdbReader) ReverseQueryRelationships( opts ...options.ReverseQueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { query := cr.fromBuilder(queryTuples, tableTuple) - qBuilder, err := common.NewSchemaQueryFilterer(schema, query). + qBuilder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount). FilterWithSubjectsSelectors(subjectsFilter.AsSelector()) if err != nil { return nil, err diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 6bc70bd64a..821380dbac 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -319,6 +319,7 @@ func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader { createTxFunc, executor, buildLivingObjectFilterForRevision(rev), + mds.filterMaximumIDCount, } } @@ -356,6 +357,7 @@ func (mds *Datastore) ReadWriteTx( longLivedTx, executor, currentlyLivingObjects, + mds.filterMaximumIDCount, }, mds.driver.RelationTuple(), tx, @@ -484,6 +486,7 @@ type Datastore struct { watchBufferLength uint16 watchBufferWriteTimeout time.Duration maxRetries uint8 + filterMaximumIDCount uint16 optimizedRevisionQuery string validTransactionQuery string diff --git a/internal/datastore/mysql/options.go b/internal/datastore/mysql/options.go index 00e7858a58..440836e88e 100644 --- a/internal/datastore/mysql/options.go +++ b/internal/datastore/mysql/options.go @@ -22,6 +22,7 @@ const ( defaultMaxRetries = 8 defaultGCEnabled = true defaultCredentialsProviderName = "" + defaultFilterMaximumIDCount = 100 ) type mysqlOptions struct { @@ -42,6 +43,7 @@ type mysqlOptions struct { lockWaitTimeoutSeconds *uint8 gcEnabled bool credentialsProviderName string + filterMaximumIDCount uint16 } // Option provides the facility to configure how clients within the @@ -64,6 +66,7 @@ func generateConfig(options []Option) (mysqlOptions, error) { maxRetries: defaultMaxRetries, gcEnabled: defaultGCEnabled, credentialsProviderName: defaultCredentialsProviderName, + filterMaximumIDCount: defaultFilterMaximumIDCount, } for _, option := range options { @@ -247,3 +250,8 @@ func GCMaxOperationTime(time time.Duration) Option { func CredentialsProviderName(credentialsProviderName string) Option { return func(mo *mysqlOptions) { mo.credentialsProviderName = credentialsProviderName } } + +// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries +func FilterMaximumIDCount(filterMaximumIDCount uint16) Option { + return func(mo *mysqlOptions) { mo.filterMaximumIDCount = filterMaximumIDCount } +} diff --git a/internal/datastore/mysql/reader.go b/internal/datastore/mysql/reader.go index 95ed9afc5e..ce1ee33a3f 100644 --- a/internal/datastore/mysql/reader.go +++ b/internal/datastore/mysql/reader.go @@ -22,9 +22,10 @@ type txFactory func(context.Context) (*sql.Tx, txCleanupFunc, error) type mysqlReader struct { *QueryBuilder - txSource txFactory - executor common.QueryExecutor - filterer queryFilterer + txSource txFactory + executor common.QueryExecutor + filterer queryFilterer + filterMaximumIDCount uint16 } type queryFilterer func(original sq.SelectBuilder) sq.SelectBuilder @@ -65,7 +66,7 @@ func (mr *mysqlReader) CountRelationships(ctx context.Context, name string) (int return 0, err } - qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.CountTupleQuery)).FilterWithRelationshipsFilter(relFilter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.CountTupleQuery), mr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter) if err != nil { return 0, err } @@ -174,7 +175,7 @@ func (mr *mysqlReader) QueryRelationships( filter datastore.RelationshipsFilter, opts ...options.QueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery)).FilterWithRelationshipsFilter(filter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery), mr.filterMaximumIDCount).FilterWithRelationshipsFilter(filter) if err != nil { return nil, err } @@ -187,7 +188,7 @@ func (mr *mysqlReader) ReverseQueryRelationships( subjectsFilter datastore.SubjectsFilter, opts ...options.ReverseQueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery)). + qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery), mr.filterMaximumIDCount). FilterWithSubjectsSelectors(subjectsFilter.AsSelector()) if err != nil { return nil, err diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index 9fe42d6085..54c8208d41 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -21,6 +21,7 @@ type postgresOptions struct { gcInterval time.Duration gcMaxOperationTime time.Duration maxRetries uint8 + filterMaximumIDCount uint16 enablePrometheusStats bool analyzeBeforeStatistics bool @@ -63,6 +64,7 @@ const ( defaultGCEnabled = true defaultCredentialsProviderName = "" defaultReadStrictMode = false + defaultFilterMaximumIDCount = 100 ) // Option provides the facility to configure how clients within the @@ -84,6 +86,7 @@ func generateConfig(options []Option) (postgresOptions, error) { credentialsProviderName: defaultCredentialsProviderName, readStrictMode: defaultReadStrictMode, queryInterceptor: nil, + filterMaximumIDCount: defaultFilterMaximumIDCount, } for _, option := range options { @@ -356,3 +359,8 @@ func MigrationPhase(phase string) Option { func CredentialsProviderName(credentialsProviderName string) Option { return func(po *postgresOptions) { po.credentialsProviderName = credentialsProviderName } } + +// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries +func FilterMaximumIDCount(filterMaximumIDCount uint16) Option { + return func(po *postgresOptions) { po.filterMaximumIDCount = filterMaximumIDCount } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index af8f64b7b2..c1a48c6619 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -330,6 +330,7 @@ func newPostgresDatastore( credentialsProvider: credentialsProvider, isPrimary: isPrimary, inStrictReadMode: config.readStrictMode, + filterMaximumIDCount: config.filterMaximumIDCount, } if isPrimary && config.readStrictMode { @@ -384,10 +385,11 @@ type pgDatastore struct { credentialsProvider datastore.CredentialsProvider - gcGroup *errgroup.Group - gcCtx context.Context - cancelGc context.CancelFunc - gcHasRun atomic.Bool + gcGroup *errgroup.Group + gcCtx context.Context + cancelGc context.CancelFunc + gcHasRun atomic.Bool + filterMaximumIDCount uint16 } func (pgd *pgDatastore) IsStrictReadModeEnabled() bool { @@ -410,6 +412,7 @@ func (pgd *pgDatastore) SnapshotReader(revRaw datastore.Revision) datastore.Read queryFuncs, executor, buildLivingObjectFilterForRevision(rev), + pgd.filterMaximumIDCount, } } @@ -447,6 +450,7 @@ func (pgd *pgDatastore) ReadWriteTx( queryFuncs, executor, currentlyLivingObjects, + pgd.filterMaximumIDCount, }, tx, newXID, diff --git a/internal/datastore/postgres/reader.go b/internal/datastore/postgres/reader.go index 7df79a924a..8297bbaa64 100644 --- a/internal/datastore/postgres/reader.go +++ b/internal/datastore/postgres/reader.go @@ -16,9 +16,10 @@ import ( ) type pgReader struct { - query pgxcommon.DBFuncQuerier - executor common.QueryExecutor - filterer queryFilterer + query pgxcommon.DBFuncQuerier + executor common.QueryExecutor + filterer queryFilterer + filterMaximumIDCount uint16 } type queryFilterer func(original sq.SelectBuilder) sq.SelectBuilder @@ -81,7 +82,7 @@ func (r *pgReader) CountRelationships(ctx context.Context, name string) (int, er return 0, err } - qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(countTuples)).FilterWithRelationshipsFilter(relFilter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(countTuples), r.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter) if err != nil { return 0, err } @@ -169,7 +170,7 @@ func (r *pgReader) QueryRelationships( filter datastore.RelationshipsFilter, opts ...options.QueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(queryTuples)).FilterWithRelationshipsFilter(filter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(queryTuples), r.filterMaximumIDCount).FilterWithRelationshipsFilter(filter) if err != nil { return nil, err } @@ -182,7 +183,7 @@ func (r *pgReader) ReverseQueryRelationships( subjectsFilter datastore.SubjectsFilter, opts ...options.ReverseQueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(queryTuples)). + qBuilder, err := common.NewSchemaQueryFilterer(schema, r.filterer(queryTuples), r.filterMaximumIDCount). FilterWithSubjectsSelectors(subjectsFilter.AsSelector()) if err != nil { return nil, err diff --git a/internal/datastore/spanner/options.go b/internal/datastore/spanner/options.go index 150d7ec1d9..00a5ba3499 100644 --- a/internal/datastore/spanner/options.go +++ b/internal/datastore/spanner/options.go @@ -22,6 +22,7 @@ type spannerOptions struct { minSessions uint64 maxSessions uint64 migrationPhase string + filterMaximumIDCount uint16 } type migrationPhase uint8 @@ -202,3 +203,8 @@ func MaxSessionCount(maxSessions uint64) Option { func MigrationPhase(phase string) Option { return func(po *spannerOptions) { po.migrationPhase = phase } } + +// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries +func FilterMaximumIDCount(filterMaximumIDCount uint16) Option { + return func(po *spannerOptions) { po.filterMaximumIDCount = filterMaximumIDCount } +} diff --git a/internal/datastore/spanner/reader.go b/internal/datastore/spanner/reader.go index 082bda9bb8..5d0f63e00d 100644 --- a/internal/datastore/spanner/reader.go +++ b/internal/datastore/spanner/reader.go @@ -29,8 +29,9 @@ type readTX interface { type txFactory func() readTX type spannerReader struct { - executor common.QueryExecutor - txSource txFactory + executor common.QueryExecutor + txSource txFactory + filterMaximumIDCount uint16 } func (sr spannerReader) CountRelationships(ctx context.Context, name string) (int, error) { @@ -51,7 +52,7 @@ func (sr spannerReader) CountRelationships(ctx context.Context, name string) (in return 0, err } - builder, err := common.NewSchemaQueryFilterer(schema, countTuples).FilterWithRelationshipsFilter(relFilter) + builder, err := common.NewSchemaQueryFilterer(schema, countTuples, sr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter) if err != nil { return 0, err } @@ -131,7 +132,7 @@ func (sr spannerReader) QueryRelationships( filter datastore.RelationshipsFilter, opts ...options.QueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, queryTuples).FilterWithRelationshipsFilter(filter) + qBuilder, err := common.NewSchemaQueryFilterer(schema, queryTuples, sr.filterMaximumIDCount).FilterWithRelationshipsFilter(filter) if err != nil { return nil, err } @@ -144,7 +145,7 @@ func (sr spannerReader) ReverseQueryRelationships( subjectsFilter datastore.SubjectsFilter, opts ...options.ReverseQueryOptionsOption, ) (iter datastore.RelationshipIterator, err error) { - qBuilder, err := common.NewSchemaQueryFilterer(schema, queryTuples). + qBuilder, err := common.NewSchemaQueryFilterer(schema, queryTuples, sr.filterMaximumIDCount). FilterWithSubjectsSelectors(subjectsFilter.AsSelector()) if err != nil { return nil, err diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 3dce5403aa..0ebb6cb2b5 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -91,6 +91,7 @@ type spannerDatastore struct { cachedEstimatedBytesPerRelationship uint64 tableSizesStatsTable string + filterMaximumIDCount uint16 } // NewSpannerDatastore returns a datastore backed by cloud spanner @@ -234,7 +235,7 @@ func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datas return &traceableRTX{delegate: sd.client.Single().WithTimestampBound(spanner.ReadTimestamp(r.Time()))} } executor := common.QueryExecutor{Executor: queryExecutor(txSource)} - return spannerReader{executor, txSource} + return spannerReader{executor, txSource, sd.filterMaximumIDCount} } func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { @@ -251,7 +252,7 @@ func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUser executor := common.QueryExecutor{Executor: queryExecutor(txSource)} rwt := spannerReadWriteTXN{ - spannerReader{executor, txSource}, + spannerReader{executor, txSource, sd.filterMaximumIDCount}, spannerRWT, } err := func() error { diff --git a/internal/dispatch/cluster/cluster.go b/internal/dispatch/cluster/cluster.go index edc69952ff..49e45445fb 100644 --- a/internal/dispatch/cluster/cluster.go +++ b/internal/dispatch/cluster/cluster.go @@ -19,6 +19,7 @@ type optionState struct { cache cache.Cache concurrencyLimits graph.ConcurrencyLimits remoteDispatchTimeout time.Duration + dispatchChunkSize uint16 } // MetricsEnabled enables issuing prometheus metrics @@ -49,6 +50,13 @@ func ConcurrencyLimits(limits graph.ConcurrencyLimits) Option { } } +// DispatchChunkSize sets the maximum number of items to be dispatched in a single dispatch request +func DispatchChunkSize(dispatchChunkSize uint16) Option { + return func(state *optionState) { + state.dispatchChunkSize = dispatchChunkSize + } +} + // RemoteDispatchTimeout sets the maximum timeout for a remote dispatch. // Defaults to 60s (as defined in the remote dispatcher). func RemoteDispatchTimeout(remoteDispatchTimeout time.Duration) Option { @@ -66,7 +74,7 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp fn(&opts) } - clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits) + clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits, opts.dispatchChunkSize) if opts.prometheusSubsystem == "" { opts.prometheusSubsystem = "dispatch" diff --git a/internal/dispatch/combined/combined.go b/internal/dispatch/combined/combined.go index cf87cd3bf4..7bc050b93c 100644 --- a/internal/dispatch/combined/combined.go +++ b/internal/dispatch/combined/combined.go @@ -38,6 +38,7 @@ type optionState struct { remoteDispatchTimeout time.Duration secondaryUpstreamAddrs map[string]string secondaryUpstreamExprs map[string]string + dispatchChunkSize uint16 } // MetricsEnabled enables issuing prometheus metrics @@ -116,6 +117,13 @@ func ConcurrencyLimits(limits graph.ConcurrencyLimits) Option { } } +// DispatchChunkSize sets the maximum number of items to be dispatched in a single dispatch request +func DispatchChunkSize(dispatchChunkSize uint16) Option { + return func(state *optionState) { + state.dispatchChunkSize = dispatchChunkSize + } +} + // RemoteDispatchTimeout sets the maximum timeout for a remote dispatch. // Defaults to 60s (as defined in the remote dispatcher). func RemoteDispatchTimeout(remoteDispatchTimeout time.Duration) Option { @@ -142,7 +150,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) { return nil, err } - redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits) + redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits, opts.dispatchChunkSize) redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{}) // If an upstream is specified, create a cluster dispatcher. diff --git a/internal/dispatch/graph/check_test.go b/internal/dispatch/graph/check_test.go index 3d1a0bf75d..824a9295c2 100644 --- a/internal/dispatch/graph/check_test.go +++ b/internal/dispatch/graph/check_test.go @@ -172,7 +172,7 @@ func TestMaxDepth(t *testing.T) { revision, err := common.UpdateTuplesInDatastore(ctx, ds, mutation) require.NoError(err) - dispatch := NewLocalOnlyDispatcher(10) + dispatch := NewLocalOnlyDispatcher(10, 100) _, err = dispatch.DispatchCheck(ctx, &v1.DispatchCheckRequest{ ResourceRelation: RR("folder", "view"), @@ -1217,7 +1217,7 @@ func TestCheckPermissionOverSchema(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -1552,7 +1552,7 @@ func TestCheckWithHints(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -1596,7 +1596,7 @@ func TestCheckWithHints(t *testing.T) { func TestCheckHintsPartialApplication(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -1641,7 +1641,7 @@ func TestCheckHintsPartialApplication(t *testing.T) { func TestCheckHintsPartialApplicationOverArrow(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -1694,7 +1694,7 @@ func newLocalDispatcherWithConcurrencyLimit(t testing.TB, concurrencyLimit uint1 ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) - dispatch := NewLocalOnlyDispatcher(concurrencyLimit) + dispatch := NewLocalOnlyDispatcher(concurrencyLimit, 100) cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{}) cachingDispatcher.SetDelegate(dispatch) @@ -1716,7 +1716,7 @@ func newLocalDispatcherWithSchemaAndRels(t testing.TB, schema string, rels []*co ds, revision := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, schema, rels, require.New(t)) - dispatch := NewLocalOnlyDispatcher(10) + dispatch := NewLocalOnlyDispatcher(10, 100) cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{}) cachingDispatcher.SetDelegate(dispatch) diff --git a/internal/dispatch/graph/expand_test.go b/internal/dispatch/graph/expand_test.go index 99b26526f3..592de1d505 100644 --- a/internal/dispatch/graph/expand_test.go +++ b/internal/dispatch/graph/expand_test.go @@ -291,7 +291,7 @@ func TestMaxDepthExpand(t *testing.T) { require.NoError(err) require.NoError(datastoremw.SetInContext(ctx, ds)) - dispatch := NewLocalOnlyDispatcher(10) + dispatch := NewLocalOnlyDispatcher(10, 100) _, err = dispatch.DispatchExpand(ctx, &v1.DispatchExpandRequest{ ResourceAndRelation: ONR("folder", "oops", "view"), diff --git a/internal/dispatch/graph/graph.go b/internal/dispatch/graph/graph.go index 11a1564afb..629186c50c 100644 --- a/internal/dispatch/graph/graph.go +++ b/internal/dispatch/graph/graph.go @@ -78,38 +78,38 @@ func SharedConcurrencyLimits(concurrencyLimit uint16) ConcurrencyLimits { } // NewLocalOnlyDispatcher creates a dispatcher that consults with the graph to formulate a response. -func NewLocalOnlyDispatcher(concurrencyLimit uint16) dispatch.Dispatcher { - return NewLocalOnlyDispatcherWithLimits(SharedConcurrencyLimits(concurrencyLimit)) +func NewLocalOnlyDispatcher(concurrencyLimit uint16, dispatchChunkSize uint16) dispatch.Dispatcher { + return NewLocalOnlyDispatcherWithLimits(SharedConcurrencyLimits(concurrencyLimit), dispatchChunkSize) } // NewLocalOnlyDispatcherWithLimits creates a dispatcher thatg consults with the graph to formulate a response // and has the defined concurrency limits per dispatch type. -func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits) dispatch.Dispatcher { +func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher { d := &localDispatcher{} concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) - d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check) + d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, dispatchChunkSize) d.expander = graph.NewConcurrentExpander(d) - d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources) - d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources) - d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects) - d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources) + d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, dispatchChunkSize) + d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, dispatchChunkSize) + d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, dispatchChunkSize) + d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, dispatchChunkSize) return d } // NewDispatcher creates a dispatcher that consults with the graph and redispatches subproblems to // the provided redispatcher. -func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimits ConcurrencyLimits) dispatch.Dispatcher { +func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher { concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) - checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check) + checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, dispatchChunkSize) expander := graph.NewConcurrentExpander(redispatcher) - reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources) - lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources) - lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects) - lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources) + reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, dispatchChunkSize) + lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize) + lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, dispatchChunkSize) + lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize) return &localDispatcher{ checker: checker, diff --git a/internal/dispatch/graph/lookupresources2_test.go b/internal/dispatch/graph/lookupresources2_test.go index 58ec963d17..83b2ca2697 100644 --- a/internal/dispatch/graph/lookupresources2_test.go +++ b/internal/dispatch/graph/lookupresources2_test.go @@ -313,7 +313,7 @@ func TestMaxDepthLookup2(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) @@ -534,7 +534,7 @@ func TestLookupResources2OverSchemaWithCursors(t *testing.T) { t.Run(fmt.Sprintf("ps-%d_", pageSize), func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -597,7 +597,7 @@ func TestLookupResources2ImmediateTimeout(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) @@ -632,7 +632,7 @@ func TestLookupResources2WithError(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) @@ -797,7 +797,7 @@ func TestLookupResources2EnsureCheckHints(t *testing.T) { checkingDS := disallowedWrapper{ds, tc.disallowedQueries} - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) diff --git a/internal/dispatch/graph/lookupresources_test.go b/internal/dispatch/graph/lookupresources_test.go index d7077a50af..6fa6c90ad7 100644 --- a/internal/dispatch/graph/lookupresources_test.go +++ b/internal/dispatch/graph/lookupresources_test.go @@ -314,7 +314,7 @@ func TestMaxDepthLookup(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) @@ -594,7 +594,7 @@ func TestLookupResourcesOverSchemaWithCursors(t *testing.T) { t.Run(fmt.Sprintf("ps-%d_", pageSize), func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -655,7 +655,7 @@ func TestLookupResourcesImmediateTimeout(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) @@ -688,7 +688,7 @@ func TestLookupResourcesWithError(t *testing.T) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) defer dispatcher.Close() ctx := datastoremw.ContextWithHandle(context.Background()) diff --git a/internal/dispatch/graph/lookupsubjects_test.go b/internal/dispatch/graph/lookupsubjects_test.go index 15ade5e692..ad2ddca1b2 100644 --- a/internal/dispatch/graph/lookupsubjects_test.go +++ b/internal/dispatch/graph/lookupsubjects_test.go @@ -208,7 +208,7 @@ func TestLookupSubjectsMaxDepth(t *testing.T) { revision, err := common.WriteTuples(ctx, ds, corev1.RelationTupleUpdate_CREATE, tpl) require.NoError(err) - dis := NewLocalOnlyDispatcher(10) + dis := NewLocalOnlyDispatcher(10, 100) stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupSubjectsResponse](ctx) err = dis.DispatchLookupSubjects(&v1.DispatchLookupSubjectsRequest{ @@ -995,7 +995,7 @@ func TestLookupSubjectsOverSchema(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) diff --git a/internal/dispatch/graph/reachableresources_test.go b/internal/dispatch/graph/reachableresources_test.go index e1b1af6054..b5dbed7733 100644 --- a/internal/dispatch/graph/reachableresources_test.go +++ b/internal/dispatch/graph/reachableresources_test.go @@ -265,7 +265,7 @@ func BenchmarkReachableResources(b *testing.B) { ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ctx := datastoremw.ContextWithHandle(context.Background()) require.NoError(datastoremw.SetInContext(ctx, ds)) @@ -571,7 +571,7 @@ func TestCaveatedReachableResources(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -691,7 +691,7 @@ func TestReachableResourcesMultipleEntrypointEarlyCancel(t *testing.T) { testRels, require.New(t), ) - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -765,7 +765,7 @@ func TestReachableResourcesCursors(t *testing.T) { for _, subject := range subjects { t.Run(subject, func(t *testing.T) { - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -870,7 +870,7 @@ func TestReachableResourcesPaginationWithLimit(t *testing.T) { for _, limit := range []uint32{1, 10, 50, 100, 150, 250, 500} { limit := limit t.Run(fmt.Sprintf("limit-%d", limit), func(t *testing.T) { - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) var cursor *v1.Cursor foundResources := mapz.NewSet[string]() @@ -948,7 +948,7 @@ func TestReachableResourcesWithQueryError(t *testing.T) { require.New(t), ) - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) @@ -1226,7 +1226,7 @@ func TestReachableResourcesOverSchema(t *testing.T) { t.Run(fmt.Sprintf("ps-%d_", pageSize), func(t *testing.T) { require := require.New(t) - dispatcher := NewLocalOnlyDispatcher(10) + dispatcher := NewLocalOnlyDispatcher(10, 100) ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(err) @@ -1310,7 +1310,7 @@ func TestReachableResourcesWithPreCancelation(t *testing.T) { require.New(t), ) - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -1365,7 +1365,7 @@ func TestReachableResourcesWithUnexpectedContextCancelation(t *testing.T) { require.New(t), ) - dispatcher := NewLocalOnlyDispatcher(2) + dispatcher := NewLocalOnlyDispatcher(2, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) @@ -1465,7 +1465,7 @@ func TestReachableResourcesWithCachingInParallelTest(t *testing.T) { ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) - dispatcher := NewLocalOnlyDispatcher(50) + dispatcher := NewLocalOnlyDispatcher(50, 100) cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{}) require.NoError(t, err) diff --git a/internal/graph/check.go b/internal/graph/check.go index 108fee65d9..878884e9ee 100644 --- a/internal/graph/check.go +++ b/internal/graph/check.go @@ -49,15 +49,16 @@ func init() { } // NewConcurrentChecker creates an instance of ConcurrentChecker. -func NewConcurrentChecker(d dispatch.Check, concurrencyLimit uint16) *ConcurrentChecker { - return &ConcurrentChecker{d, concurrencyLimit} +func NewConcurrentChecker(d dispatch.Check, concurrencyLimit uint16, dispatchChunkSize uint16) *ConcurrentChecker { + return &ConcurrentChecker{d, concurrencyLimit, dispatchChunkSize} } // ConcurrentChecker exposes a method to perform Check requests, and delegates subproblems to the // provided dispatch.Check instance. type ConcurrentChecker struct { - d dispatch.Check - concurrencyLimit uint16 + d dispatch.Check + concurrencyLimit uint16 + dispatchChunkSize uint16 } // ValidatedCheckRequest represents a request after it has been validated and parsed for internal @@ -93,8 +94,8 @@ type currentRequestContext struct { // requests. resultsSetting v1.DispatchCheckRequest_ResultsSetting - // maxDispatchCount is the maximum number of resource IDs that can be specified in each dispatch. - maxDispatchCount uint16 + // dispatchChunkSize is the maximum number of resource IDs that can be specified in each dispatch. + dispatchChunkSize uint16 } // Check performs a check request with the provided request and context @@ -216,11 +217,11 @@ func (cc *ConcurrentChecker) checkInternal(ctx context.Context, req ValidatedChe parentReq: req, filteredResourceIDs: filteredResourcesIds, resultsSetting: resultsSetting, - maxDispatchCount: maxDispatchChunkSize, + dispatchChunkSize: cc.dispatchChunkSize, } if req.Debug == v1.DispatchCheckRequest_ENABLE_TRACE_DEBUGGING { - crc.maxDispatchCount = 1 + crc.dispatchChunkSize = 1 } if relation.UsersetRewrite == nil { @@ -472,11 +473,11 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequest // Convert the subjects into batched requests. // To simplify the logic, +1 is added to account for the situation where // the number of elements is less than the chunk size, and spare us some annoying code. - expectedNumberOfChunks := subjectsToDispatch.ValueLen()/int(crc.maxDispatchCount) + 1 + expectedNumberOfChunks := subjectsToDispatch.ValueLen()/int(crc.dispatchChunkSize) + 1 toDispatch := make([]directDispatch, 0, expectedNumberOfChunks) subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) { chunkCount := 0.0 - slicez.ForEachChunk(resourceIds, crc.maxDispatchCount, func(resourceIdChunk []string) { + slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) { chunkCount++ toDispatch = append(toDispatch, directDispatch{ resourceType: rr, @@ -730,11 +731,11 @@ func checkIntersectionTupleToUserset( // Convert the subjects into batched requests. // To simplify the logic, +1 is added to account for the situation where // the number of elements is less than the chunk size, and spare us some annoying code. - expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.maxDispatchCount + 1 + expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.dispatchChunkSize + 1 toDispatch := make([]directDispatch, 0, expectedNumberOfChunks) subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) { chunkCount := 0.0 - slicez.ForEachChunk(resourceIds, crc.maxDispatchCount, func(resourceIdChunk []string) { + slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) { chunkCount++ toDispatch = append(toDispatch, directDispatch{ resourceType: rr, @@ -757,7 +758,7 @@ func checkIntersectionTupleToUserset( parentReq: crc.parentReq, filteredResourceIDs: crc.filteredResourceIDs, resultsSetting: v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS, - maxDispatchCount: crc.maxDispatchCount, + dispatchChunkSize: crc.dispatchChunkSize, }, toDispatch, func(ctx context.Context, crc currentRequestContext, dd directDispatch) checkResultWithType { @@ -913,11 +914,11 @@ func checkTupleToUserset[T relation]( // Convert the subjects into batched requests. // To simplify the logic, +1 is added to account for the situation where // the number of elements is less than the chunk size, and spare us some annoying code. - expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.maxDispatchCount + 1 + expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.dispatchChunkSize + 1 toDispatch := make([]directDispatch, 0, expectedNumberOfChunks) subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) { chunkCount := 0.0 - slicez.ForEachChunk(resourceIds, crc.maxDispatchCount, func(resourceIdChunk []string) { + slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) { chunkCount++ toDispatch = append(toDispatch, directDispatch{ resourceType: rr, @@ -1061,7 +1062,7 @@ func all[T any]( parentReq: crc.parentReq, filteredResourceIDs: crc.filteredResourceIDs, resultsSetting: v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS, - maxDispatchCount: crc.maxDispatchCount, + dispatchChunkSize: crc.dispatchChunkSize, }, children, handler, resultChan, concurrencyLimit) defer cancelFn() @@ -1117,7 +1118,7 @@ func difference[T any]( parentReq: crc.parentReq, filteredResourceIDs: crc.filteredResourceIDs, resultsSetting: v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS, - maxDispatchCount: crc.maxDispatchCount, + dispatchChunkSize: crc.dispatchChunkSize, }, children[0]) baseChan <- result }() @@ -1126,7 +1127,7 @@ func difference[T any]( parentReq: crc.parentReq, filteredResourceIDs: crc.filteredResourceIDs, resultsSetting: v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS, - maxDispatchCount: crc.maxDispatchCount, + dispatchChunkSize: crc.dispatchChunkSize, }, children[1:], handler, othersChan, concurrencyLimit-1) defer cancelFn() diff --git a/internal/graph/checkingresourcestream.go b/internal/graph/checkingresourcestream.go index eb03a30c29..62769220a5 100644 --- a/internal/graph/checkingresourcestream.go +++ b/internal/graph/checkingresourcestream.go @@ -11,7 +11,6 @@ import ( "github.com/authzed/spicedb/internal/dispatch" "github.com/authzed/spicedb/internal/graph/computed" - "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/genutil/mapz" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -49,6 +48,8 @@ type resourceQueue struct { // beingProcessed are those resources (keyed by orderingIndex) that are currently being processed. beingProcessed map[uint64]possibleResource + + dispatchChunkSize uint16 } type processingStatus int @@ -71,7 +72,7 @@ func (rq *resourceQueue) addPossibleResource(pr possibleResource) processingStat } rq.toProcess[pr.orderingIndex] = pr - if len(rq.toProcess) < int(datastore.FilterMaximumIDCount) { + if len(rq.toProcess) < int(rq.dispatchChunkSize) { return awaitingMoreResources } @@ -105,7 +106,7 @@ func (rq *resourceQueue) selectResourcesToProcess(alwaysReturn bool) []possibleR defer rq.lock.Unlock() toProcess := maps.Values(rq.toProcess) - if !alwaysReturn && len(toProcess) < int(datastore.FilterMaximumIDCount) { + if !alwaysReturn && len(toProcess) < int(rq.dispatchChunkSize) { return nil } @@ -211,6 +212,8 @@ type checkingResourceStream struct { processingWaitGroup sync.WaitGroup publishingWaitGroup sync.WaitGroup + + dispatchChunkSize uint16 } func newCheckingResourceStream( @@ -222,6 +225,7 @@ func newCheckingResourceStream( parentStream dispatch.Stream[*v1.DispatchLookupResourcesResponse], limits *limitTracker, concurrencyLimit uint16, + dispatchChunkSize uint16, ) *checkingResourceStream { if concurrencyLimit == 0 { concurrencyLimit = 1 @@ -251,10 +255,11 @@ func newCheckingResourceStream( sem: make(chan struct{}, processingConcurrencyLimit), rq: &resourceQueue{ - ctx: lookupContext, - toProcess: map[uint64]possibleResource{}, - beingProcessed: map[uint64]possibleResource{}, - toPublish: map[uint64]possibleResource{}, + ctx: lookupContext, + toProcess: map[uint64]possibleResource{}, + beingProcessed: map[uint64]possibleResource{}, + toPublish: map[uint64]possibleResource{}, + dispatchChunkSize: dispatchChunkSize, }, reachableResourcesAreAvailableForProcessing: make(chan struct{}, concurrencyLimit), reachableResourcesCompleted: make(chan struct{}, concurrencyLimit), @@ -268,6 +273,7 @@ func newCheckingResourceStream( processingWaitGroup: sync.WaitGroup{}, publishingWaitGroup: sync.WaitGroup{}, + dispatchChunkSize: dispatchChunkSize, } // Spawn the goroutine that will publish resources to the parent stream in the proper order. @@ -463,6 +469,7 @@ func (crs *checkingResourceStream) runProcess(alwaysProcess bool) (bool, error) DebugOption: computed.NoDebugging, }, toCheck.Keys(), + crs.dispatchChunkSize, ) if err != nil { return true, err diff --git a/internal/graph/computed/computecheck.go b/internal/graph/computed/computecheck.go index d5d921af89..cf0860d236 100644 --- a/internal/graph/computed/computecheck.go +++ b/internal/graph/computed/computecheck.go @@ -56,8 +56,9 @@ func ComputeCheck( d dispatch.Check, params CheckParameters, resourceID string, + dispatchChunkSize uint16, ) (*v1.ResourceCheckResult, *v1.ResponseMeta, error) { - resultsMap, meta, err := computeCheck(ctx, d, params, []string{resourceID}) + resultsMap, meta, err := computeCheck(ctx, d, params, []string{resourceID}, dispatchChunkSize) if err != nil { return nil, meta, err } @@ -71,14 +72,16 @@ func ComputeBulkCheck( d dispatch.Check, params CheckParameters, resourceIDs []string, + dispatchChunkSize uint16, ) (map[string]*v1.ResourceCheckResult, *v1.ResponseMeta, error) { - return computeCheck(ctx, d, params, resourceIDs) + return computeCheck(ctx, d, params, resourceIDs, dispatchChunkSize) } func computeCheck(ctx context.Context, d dispatch.Check, params CheckParameters, resourceIDs []string, + dispatchChunkSize uint16, ) (map[string]*v1.ResourceCheckResult, *v1.ResponseMeta, error) { debugging := v1.DispatchCheckRequest_NO_DEBUG if params.DebugOption == BasicDebuggingEnabled { @@ -108,7 +111,7 @@ func computeCheck(ctx context.Context, } // TODO(jschorr): Should we make this run in parallel via the preloadedTaskRunner? - _, err = slicez.ForEachChunkUntil(resourceIDs, datastore.FilterMaximumIDCount, func(resourceIDsToCheck []string) (bool, error) { + _, err = slicez.ForEachChunkUntil(resourceIDs, dispatchChunkSize, func(resourceIDsToCheck []string) (bool, error) { checkResult, err := d.DispatchCheck(ctx, &v1.DispatchCheckRequest{ ResourceRelation: params.ResourceType, ResourceIds: resourceIDsToCheck, diff --git a/internal/graph/computed/computecheck_test.go b/internal/graph/computed/computecheck_test.go index 557cb96218..e8d943b196 100644 --- a/internal/graph/computed/computecheck_test.go +++ b/internal/graph/computed/computecheck_test.go @@ -806,7 +806,7 @@ func TestComputeCheckWithCaveats(t *testing.T) { ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(t, err) - dispatch := graph.NewLocalOnlyDispatcher(10) + dispatch := graph.NewLocalOnlyDispatcher(10, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -831,6 +831,7 @@ func TestComputeCheckWithCaveats(t *testing.T) { DebugOption: computed.BasicDebuggingEnabled, }, rel.ResourceAndRelation.ObjectId, + 100, ) if r.error != "" { @@ -854,7 +855,7 @@ func TestComputeCheckError(t *testing.T) { ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(t, err) - dispatch := graph.NewLocalOnlyDispatcher(10) + dispatch := graph.NewLocalOnlyDispatcher(10, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -871,6 +872,7 @@ func TestComputeCheckError(t *testing.T) { DebugOption: computed.BasicDebuggingEnabled, }, "id", + 100, ) require.Error(t, err) } @@ -879,7 +881,7 @@ func TestComputeBulkCheck(t *testing.T) { ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC) require.NoError(t, err) - dispatch := graph.NewLocalOnlyDispatcher(10) + dispatch := graph.NewLocalOnlyDispatcher(10, 100) ctx := log.Logger.WithContext(datastoremw.ContextWithHandle(context.Background())) require.NoError(t, datastoremw.SetInContext(ctx, ds)) @@ -923,6 +925,7 @@ func TestComputeBulkCheck(t *testing.T) { DebugOption: computed.NoDebugging, }, []string{"direct", "first", "second", "third"}, + 100, ) require.NoError(t, err) diff --git a/internal/graph/graph.go b/internal/graph/graph.go index e05074f89d..2a44189495 100644 --- a/internal/graph/graph.go +++ b/internal/graph/graph.go @@ -2,36 +2,15 @@ package graph import ( "context" - "testing" core "github.com/authzed/spicedb/pkg/proto/core/v1" - "github.com/authzed/spicedb/pkg/datastore" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" ) // Ellipsis relation is used to signify a semantic-free relationship. const Ellipsis = "..." -// maxDispatchChunkSize is the maximum size for a dispatch chunk. Must be less than or equal -// to the maximum ID count for filters in the datastore. -var maxDispatchChunkSize uint16 = datastore.FilterMaximumIDCount - -// progressiveDispatchChunkSizes are chunk sizes growing over time for dispatching. All entries -// must be less than or equal to the maximum ID count for filters in the datastore. -var progressiveDispatchChunkSizes = []uint16{5, 10, 25, 50, maxDispatchChunkSize} - -// SetDispatchChunkSizesForTesting sets the dispatch chunk sizes for testing. -func SetDispatchChunkSizesForTesting(t *testing.T, sizes []uint16) { - originalSizes := progressiveDispatchChunkSizes - maxDispatchChunkSize = sizes[len(sizes)-1] - progressiveDispatchChunkSizes = sizes - t.Cleanup(func() { - progressiveDispatchChunkSizes = originalSizes - maxDispatchChunkSize = originalSizes[len(sizes)-1] - }) -} - // CheckResult is the data that is returned by a single check or sub-check. type CheckResult struct { Resp *v1.DispatchCheckResponse diff --git a/internal/graph/graph_test.go b/internal/graph/graph_test.go deleted file mode 100644 index 5c92c53a66..0000000000 --- a/internal/graph/graph_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package graph - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/authzed/spicedb/pkg/datastore" -) - -func TestChunkSizes(t *testing.T) { - for index, cs := range progressiveDispatchChunkSizes { - require.LessOrEqual(t, cs, datastore.FilterMaximumIDCount) - if index > 0 { - require.Greater(t, cs, progressiveDispatchChunkSizes[index-1]) - } - } -} - -func TestMaxDispatchChunkSize(t *testing.T) { - require.LessOrEqual(t, maxDispatchChunkSize, datastore.FilterMaximumIDCount) -} diff --git a/internal/graph/lookupresources.go b/internal/graph/lookupresources.go index fccfd7b8a1..f2d50342a4 100644 --- a/internal/graph/lookupresources.go +++ b/internal/graph/lookupresources.go @@ -12,16 +12,17 @@ import ( ) // NewCursoredLookupResources creates and instance of CursoredLookupResources. -func NewCursoredLookupResources(c dispatch.Check, r dispatch.ReachableResources, concurrencyLimit uint16) *CursoredLookupResources { - return &CursoredLookupResources{c, r, concurrencyLimit} +func NewCursoredLookupResources(c dispatch.Check, r dispatch.ReachableResources, concurrencyLimit uint16, dispatchChunkSize uint16) *CursoredLookupResources { + return &CursoredLookupResources{c, r, concurrencyLimit, dispatchChunkSize} } // CursoredLookupResources exposes a method to perform LookupResources requests, and delegates subproblems to the // provided dispatch.Lookup instance. type CursoredLookupResources struct { - c dispatch.Check - r dispatch.ReachableResources - concurrencyLimit uint16 + c dispatch.Check + r dispatch.ReachableResources + concurrencyLimit uint16 + dispatchChunkSize uint16 } // ValidatedLookupResourcesRequest represents a request after it has been validated and parsed for internal @@ -56,7 +57,7 @@ func (cl *CursoredLookupResources) LookupResources( // to the parent stream, as found resources if they are properly checked. checkingStream := newCheckingResourceStream(lookupContext, reachableContext, func() { cancelReachable(errCanceledBecauseNoAdditionalResourcesNeeded) - }, req, cl.c, parentStream, limits, cl.concurrencyLimit) + }, req, cl.c, parentStream, limits, cl.concurrencyLimit, cl.dispatchChunkSize) err := cl.r.DispatchReachableResources(&v1.DispatchReachableResourcesRequest{ ResourceRelation: req.ObjectRelation, diff --git a/internal/graph/lookupresources2.go b/internal/graph/lookupresources2.go index 20e2ac22bb..136dcaadb4 100644 --- a/internal/graph/lookupresources2.go +++ b/internal/graph/lookupresources2.go @@ -20,14 +20,15 @@ import ( "github.com/authzed/spicedb/pkg/typesystem" ) -func NewCursoredLookupResources2(dl dispatch.LookupResources2, dc dispatch.Check, concurrencyLimit uint16) *CursoredLookupResources2 { - return &CursoredLookupResources2{dl, dc, concurrencyLimit} +func NewCursoredLookupResources2(dl dispatch.LookupResources2, dc dispatch.Check, concurrencyLimit uint16, dispatchChunkSize uint16) *CursoredLookupResources2 { + return &CursoredLookupResources2{dl, dc, concurrencyLimit, dispatchChunkSize} } type CursoredLookupResources2 struct { - dl dispatch.LookupResources2 - dc dispatch.Check - concurrencyLimit uint16 + dl dispatch.LookupResources2 + dc dispatch.Check + concurrencyLimit uint16 + dispatchChunkSize uint16 } type ValidatedLookupResources2Request struct { @@ -282,7 +283,7 @@ func (crr *CursoredLookupResources2) redispatchOrReportOverDatabaseQuery( // Chunk based on the FilterMaximumIDCount, to ensure we never send more than that amount of // results to a downstream dispatch. - rsm := newResourcesSubjectMap2WithCapacity(config.sourceResourceType, uint32(datastore.FilterMaximumIDCount)) + rsm := newResourcesSubjectMap2WithCapacity(config.sourceResourceType, uint32(crr.dispatchChunkSize)) toBeHandled := make([]itemAndPostCursor[dispatchableResourcesSubjectMap2], 0) currentCursor := queryCursor @@ -319,12 +320,12 @@ func (crr *CursoredLookupResources2) redispatchOrReportOverDatabaseQuery( return nil, err } - if rsm.len() == int(datastore.FilterMaximumIDCount) { + if rsm.len() == int(crr.dispatchChunkSize) { toBeHandled = append(toBeHandled, itemAndPostCursor[dispatchableResourcesSubjectMap2]{ item: rsm.asReadOnly(), cursor: currentCursor, }) - rsm = newResourcesSubjectMap2WithCapacity(config.sourceResourceType, uint32(datastore.FilterMaximumIDCount)) + rsm = newResourcesSubjectMap2WithCapacity(config.sourceResourceType, uint32(crr.dispatchChunkSize)) currentCursor = tpl } } @@ -517,7 +518,7 @@ func (crr *CursoredLookupResources2) redispatchOrReport( MaximumDepth: parentRequest.Metadata.DepthRemaining - 1, DebugOption: computed.NoDebugging, CheckHints: checkHints, - }, resourceIDs) + }, resourceIDs, crr.dispatchChunkSize) if err != nil { return err } @@ -625,6 +626,7 @@ func (crr *CursoredLookupResources2) redispatchOrReport( crr.dl, crr.dc, crr.concurrencyLimit, + crr.dispatchChunkSize, ) }) } diff --git a/internal/graph/lookupsubjects.go b/internal/graph/lookupsubjects.go index 510b82ce52..542678fdbd 100644 --- a/internal/graph/lookupsubjects.go +++ b/internal/graph/lookupsubjects.go @@ -33,13 +33,14 @@ type ValidatedLookupSubjectsRequest struct { } // NewConcurrentLookupSubjects creates an instance of ConcurrentLookupSubjects. -func NewConcurrentLookupSubjects(d dispatch.LookupSubjects, concurrencyLimit uint16) *ConcurrentLookupSubjects { - return &ConcurrentLookupSubjects{d, concurrencyLimit} +func NewConcurrentLookupSubjects(d dispatch.LookupSubjects, concurrencyLimit uint16, dispatchChunkSize uint16) *ConcurrentLookupSubjects { + return &ConcurrentLookupSubjects{d, concurrencyLimit, dispatchChunkSize} } type ConcurrentLookupSubjects struct { - d dispatch.LookupSubjects - concurrencyLimit uint16 + d dispatch.LookupSubjects + concurrencyLimit uint16 + dispatchChunkSize uint16 } func (cl *ConcurrentLookupSubjects) LookupSubjects( @@ -621,7 +622,7 @@ func (cl *ConcurrentLookupSubjects) dispatchTo( } // Dispatch the found subjects as the resources of the next step. - slicez.ForEachChunk(resourceIds, maxDispatchChunkSize, func(resourceIdChunk []string) { + slicez.ForEachChunk(resourceIds, cl.dispatchChunkSize, func(resourceIdChunk []string) { g.Go(func() error { return cl.d.DispatchLookupSubjects(&v1.DispatchLookupSubjectsRequest{ ResourceRelation: resourceType, diff --git a/internal/graph/lr2streams.go b/internal/graph/lr2streams.go index 7c0d66bb3f..af601be0c7 100644 --- a/internal/graph/lr2streams.go +++ b/internal/graph/lr2streams.go @@ -8,7 +8,6 @@ import ( "github.com/authzed/spicedb/internal/graph/computed" "github.com/authzed/spicedb/internal/graph/hints" "github.com/authzed/spicedb/internal/taskrunner" - "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/genutil/mapz" core "github.com/authzed/spicedb/pkg/proto/core/v1" v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" @@ -31,6 +30,7 @@ func runDispatchAndChecker( lrDispatcher dispatch.LookupResources2, checkDispatcher dispatch.Check, concurrencyLimit uint16, + dispatchChunkSize uint16, ) error { // Only allow max one dispatcher and one checker to run concurrently. concurrencyLimit = min(concurrencyLimit, 2) @@ -47,6 +47,7 @@ func runDispatchAndChecker( checkDispatcher: checkDispatcher, taskrunner: taskrunner.NewTaskRunner(ctx, concurrencyLimit), lock: &sync.Mutex{}, + dispatchChunkSize: dispatchChunkSize, } return rdc.runAndWait() @@ -62,6 +63,7 @@ type rdc struct { entrypoint typesystem.ReachabilityEntrypoint lrDispatcher dispatch.LookupResources2 checkDispatcher dispatch.Check + dispatchChunkSize uint16 taskrunner *taskrunner.TaskRunner @@ -80,7 +82,7 @@ func (rdc *rdc) dispatchAndCollect(ctx context.Context, cursor *v1.Cursor) ([]*v DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1, }, OptionalCursor: cursor, - OptionalLimit: uint32(datastore.FilterMaximumIDCount), + OptionalLimit: uint32(rdc.dispatchChunkSize), }, collectingStream) return collectingStream.Results(), err } @@ -151,7 +153,7 @@ func (rdc *rdc) runChecker(ctx context.Context, collected []*v1.DispatchLookupRe MaximumDepth: rdc.parentRequest.Metadata.DepthRemaining - 1, DebugOption: computed.NoDebugging, CheckHints: checkHints, - }, resourceIDsToCheck) + }, resourceIDsToCheck, rdc.dispatchChunkSize) if err != nil { return err } diff --git a/internal/graph/reachableresources.go b/internal/graph/reachableresources.go index 5515a2a68d..f8c0157392 100644 --- a/internal/graph/reachableresources.go +++ b/internal/graph/reachableresources.go @@ -23,15 +23,16 @@ import ( const dispatchVersion = 1 // NewCursoredReachableResources creates an instance of CursoredReachableResources. -func NewCursoredReachableResources(d dispatch.ReachableResources, concurrencyLimit uint16) *CursoredReachableResources { - return &CursoredReachableResources{d, concurrencyLimit} +func NewCursoredReachableResources(d dispatch.ReachableResources, concurrencyLimit uint16, dispatchChunkSize uint16) *CursoredReachableResources { + return &CursoredReachableResources{d, concurrencyLimit, dispatchChunkSize} } // CursoredReachableResources exposes a method to perform ReachableResources requests, and // delegates subproblems to the provided dispatch.ReachableResources instance. type CursoredReachableResources struct { - d dispatch.ReachableResources - concurrencyLimit uint16 + d dispatch.ReachableResources + concurrencyLimit uint16 + dispatchChunkSize uint16 } // ValidatedReachableResourcesRequest represents a request after it has been validated and parsed for internal @@ -281,7 +282,7 @@ func (crr *CursoredReachableResources) redispatchOrReportOverDatabaseQuery( // Chunk based on the FilterMaximumIDCount, to ensure we never send more than that amount of // results to a downstream dispatch. - rsm := newResourcesSubjectMapWithCapacity(config.sourceResourceType, uint32(datastore.FilterMaximumIDCount)) + rsm := newResourcesSubjectMapWithCapacity(config.sourceResourceType, uint32(crr.dispatchChunkSize)) toBeHandled := make([]itemAndPostCursor[dispatchableResourcesSubjectMap], 0) currentCursor := queryCursor @@ -294,12 +295,12 @@ func (crr *CursoredReachableResources) redispatchOrReportOverDatabaseQuery( return nil, err } - if rsm.len() == int(datastore.FilterMaximumIDCount) { + if rsm.len() == int(crr.dispatchChunkSize) { toBeHandled = append(toBeHandled, itemAndPostCursor[dispatchableResourcesSubjectMap]{ item: rsm.asReadOnly(), cursor: currentCursor, }) - rsm = newResourcesSubjectMapWithCapacity(config.sourceResourceType, uint32(datastore.FilterMaximumIDCount)) + rsm = newResourcesSubjectMapWithCapacity(config.sourceResourceType, uint32(crr.dispatchChunkSize)) currentCursor = tpl } } diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 516aa6de19..91cc9e0df4 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -119,7 +119,7 @@ func TestCertRotation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) srv, err := server.NewConfigWithOptionsAndDefaults( server.WithDatastore(ds), - server.WithDispatcher(graph.NewLocalOnlyDispatcher(1)), + server.WithDispatcher(graph.NewLocalOnlyDispatcher(1, 100)), server.WithDispatchMaxDepth(50), server.WithMaximumPreconditionCount(1000), server.WithMaximumUpdatesPerWrite(1000), diff --git a/internal/services/integrationtesting/consistency_test.go b/internal/services/integrationtesting/consistency_test.go index 2ca87c9c4f..bc17eb7df4 100644 --- a/internal/services/integrationtesting/consistency_test.go +++ b/internal/services/integrationtesting/consistency_test.go @@ -21,7 +21,6 @@ import ( "github.com/authzed/spicedb/internal/developmentmembership" "github.com/authzed/spicedb/internal/dispatch" - "github.com/authzed/spicedb/internal/graph" "github.com/authzed/spicedb/internal/services/integrationtesting/consistencytestutil" "github.com/authzed/spicedb/pkg/cmd/server" "github.com/authzed/spicedb/pkg/datastore" @@ -48,9 +47,6 @@ const testTimedelta = 1 * time.Second // both real-world schemas, as well as the full set of hand-constructed corner // cases so that the system can be fully exercised. func TestConsistency(t *testing.T) { - // Set dispatch sizes for testing. - graph.SetDispatchChunkSizesForTesting(t, []uint16{5, 10}) - // List all the defined consistency test files. consistencyTestFiles, err := consistencytestutil.ListTestConfigs() require.NoError(t, err) @@ -66,8 +62,12 @@ func TestConsistency(t *testing.T) { for _, useLRV2 := range []bool{false, true} { useLRV2 := useLRV2 t.Run(fmt.Sprintf("lrv2-%t", useLRV2), func(t *testing.T) { - t.Parallel() - runConsistencyTestSuiteForFile(t, filePath, dispatcherKind == "caching", useLRV2) + for _, chunkSize := range []uint16{5, 10} { + t.Run(fmt.Sprintf("lrv2-%t", useLRV2), func(t *testing.T) { + t.Parallel() + runConsistencyTestSuiteForFile(t, filePath, dispatcherKind == "caching", chunkSize, useLRV2) + }) + } }) } }) @@ -76,8 +76,8 @@ func TestConsistency(t *testing.T) { } } -func runConsistencyTestSuiteForFile(t *testing.T, filePath string, useCachingDispatcher bool, useLRV2 bool) { - options := []server.ConfigOption{} +func runConsistencyTestSuiteForFile(t *testing.T, filePath string, useCachingDispatcher bool, chunkSize uint16, useLRV2 bool) { + options := []server.ConfigOption{server.WithDispatchChunkSize(chunkSize)} if useLRV2 { options = append(options, server.WithEnableExperimentalLookupResources(true)) } diff --git a/internal/services/integrationtesting/consistencytestutil/accessibilityset.go b/internal/services/integrationtesting/consistencytestutil/accessibilityset.go index ee1737b918..848ec22681 100644 --- a/internal/services/integrationtesting/consistencytestutil/accessibilityset.go +++ b/internal/services/integrationtesting/consistencytestutil/accessibilityset.go @@ -107,7 +107,7 @@ func BuildAccessibilitySet(t *testing.T, ccd ConsistencyClusterAndData) *Accessi headRevision, err := ccd.DataStore.HeadRevision(ccd.Ctx) require.NoError(t, err) - dispatcher := graph.NewLocalOnlyDispatcher(defaultConcurrencyLimit) + dispatcher := graph.NewLocalOnlyDispatcher(defaultConcurrencyLimit, 100) permissionshipByRelationship := map[string]dispatchv1.ResourceCheckResult_Membership{} uncomputedPermissionshipByRelationship := map[string]dispatchv1.ResourceCheckResult_Membership{} accessibilityByRelationship := map[string]Accessibility{} @@ -165,6 +165,7 @@ func BuildAccessibilitySet(t *testing.T, ccd ConsistencyClusterAndData) *Accessi MaximumDepth: 50, }, possibleResourceID, + 100, ) require.NoError(t, err) membership = cr.Membership diff --git a/internal/services/integrationtesting/consistencytestutil/clusteranddata.go b/internal/services/integrationtesting/consistencytestutil/clusteranddata.go index eae495a202..cb38dbf365 100644 --- a/internal/services/integrationtesting/consistencytestutil/clusteranddata.go +++ b/internal/services/integrationtesting/consistencytestutil/clusteranddata.go @@ -83,12 +83,12 @@ func BuildDataAndCreateClusterForTesting(t *testing.T, consistencyTestFilePath s // caching enabled. func CreateDispatcherForTesting(t *testing.T, withCaching bool) dispatch.Dispatcher { require := require.New(t) - dispatcher := graph.NewLocalOnlyDispatcher(defaultConcurrencyLimit) + dispatcher := graph.NewLocalOnlyDispatcher(defaultConcurrencyLimit, 100) if withCaching { cachingDispatcher, err := caching.NewCachingDispatcher(nil, false, "", &keys.CanonicalKeyHandler{}) require.NoError(err) - localDispatcher := graph.NewDispatcher(cachingDispatcher, graph.SharedConcurrencyLimits(10)) + localDispatcher := graph.NewDispatcher(cachingDispatcher, graph.SharedConcurrencyLimits(10), 100) t.Cleanup(func() { err := localDispatcher.Close() require.NoError(err) diff --git a/internal/services/v1/bulkcheck.go b/internal/services/v1/bulkcheck.go index cb4d56194f..d5b49bd679 100644 --- a/internal/services/v1/bulkcheck.go +++ b/internal/services/v1/bulkcheck.go @@ -31,7 +31,8 @@ type bulkChecker struct { maxCaveatContextSize int maxConcurrency uint16 - dispatch dispatch.Dispatcher + dispatch dispatch.Dispatcher + dispatchChunkSize uint16 } func (bc *bulkChecker) checkBulkPermissions(ctx context.Context, req *v1.CheckBulkPermissionsRequest) (*v1.CheckBulkPermissionsResponse, error) { @@ -161,7 +162,7 @@ func (bc *bulkChecker) checkBulkPermissions(ctx context.Context, req *v1.CheckBu for _, group := range groupedItems { group := group - slicez.ForEachChunk(group.resourceIDs, MaxBulkCheckDispatchChunkSize, func(resourceIDs []string) { + slicez.ForEachChunk(group.resourceIDs, bc.dispatchChunkSize, func(resourceIDs []string) { tr.Add(func(ctx context.Context) error { ds := datastoremw.MustFromContext(ctx).SnapshotReader(atRevision) @@ -184,7 +185,7 @@ func (bc *bulkChecker) checkBulkPermissions(ctx context.Context, req *v1.CheckBu } // Call bulk check to compute the check result(s) for the resource ID(s). - rcr, metadata, err := computed.ComputeBulkCheck(ctx, bc.dispatch, *group.params, resourceIDs) + rcr, metadata, err := computed.ComputeBulkCheck(ctx, bc.dispatch, *group.params, resourceIDs, bc.dispatchChunkSize) if err != nil { return appendResultsForError(group.params, resourceIDs, err) } diff --git a/internal/services/v1/experimental.go b/internal/services/v1/experimental.go index 4be1957f66..19036a11cf 100644 --- a/internal/services/v1/experimental.go +++ b/internal/services/v1/experimental.go @@ -47,7 +47,6 @@ const ( // NewExperimentalServer creates a ExperimentalServiceServer instance. func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig PermissionsServerConfig, opts ...options.ExperimentalServerOptionsOption) v1.ExperimentalServiceServer { config := options.NewExperimentalServerOptionsWithOptionsAndDefaults(opts...) - if config.DefaultExportBatchSize == 0 { log. Warn(). @@ -99,6 +98,7 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis maxCaveatContextSize: permServerConfig.MaxCaveatContextSize, maxConcurrency: config.BulkCheckMaxConcurrency, dispatch: dispatch, + dispatchChunkSize: permServerConfig.DispatchChunkSize, }, } } diff --git a/internal/services/v1/experimental_test.go b/internal/services/v1/experimental_test.go index 7bb9078e12..00d98dca8d 100644 --- a/internal/services/v1/experimental_test.go +++ b/internal/services/v1/experimental_test.go @@ -33,6 +33,8 @@ import ( "github.com/authzed/spicedb/pkg/tuple" ) +const defaultFilterMaximumIDCountForTest = 100 + func TestBulkImportRelationships(t *testing.T) { testCases := []struct { name string @@ -539,16 +541,16 @@ func TestBulkCheckPermission(t *testing.T) { { name: "chunking test", requests: (func() []string { - toReturn := make([]string, 0, datastore.FilterMaximumIDCount+5) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + toReturn := make([]string, 0, defaultFilterMaximumIDCountForTest+5) + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i)) } return toReturn })(), response: (func() []bulkCheckTest { - toReturn := make([]bulkCheckTest, 0, datastore.FilterMaximumIDCount+5) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + toReturn := make([]bulkCheckTest, 0, defaultFilterMaximumIDCountForTest+5) + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, bulkCheckTest{ req: fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i), resp: v1.CheckPermissionResponse_PERMISSIONSHIP_NO_PERMISSION, @@ -562,23 +564,23 @@ func TestBulkCheckPermission(t *testing.T) { { name: "chunking test with errors", requests: (func() []string { - toReturn := make([]string, 0, datastore.FilterMaximumIDCount+6) + toReturn := make([]string, 0, defaultFilterMaximumIDCountForTest+6) toReturn = append(toReturn, `nondoc:masterplan#view@user:eng_lead`) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i)) } return toReturn })(), response: (func() []bulkCheckTest { - toReturn := make([]bulkCheckTest, 0, datastore.FilterMaximumIDCount+6) + toReturn := make([]bulkCheckTest, 0, defaultFilterMaximumIDCountForTest+6) toReturn = append(toReturn, bulkCheckTest{ req: `nondoc:masterplan#view@user:eng_lead`, err: namespace.NewNamespaceNotFoundErr("nondoc"), }) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, bulkCheckTest{ req: fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i), resp: v1.CheckPermissionResponse_PERMISSIONSHIP_NO_PERMISSION, diff --git a/internal/services/v1/grouping.go b/internal/services/v1/grouping.go index 27163c68ed..04bcd6b85e 100644 --- a/internal/services/v1/grouping.go +++ b/internal/services/v1/grouping.go @@ -10,8 +10,6 @@ import ( core "github.com/authzed/spicedb/pkg/proto/core/v1" ) -var MaxBulkCheckDispatchChunkSize = datastore.FilterMaximumIDCount - type groupedCheckParameters struct { params *computed.CheckParameters resourceIDs []string diff --git a/internal/services/v1/permissions.go b/internal/services/v1/permissions.go index b19f23a925..6ab5e36a1f 100644 --- a/internal/services/v1/permissions.go +++ b/internal/services/v1/permissions.go @@ -101,6 +101,7 @@ func (ps *permissionServer) CheckPermission(ctx context.Context, req *v1.CheckPe DebugOption: debugOption, }, req.Resource.ObjectId, + ps.config.DispatchChunkSize, ) usagemetrics.SetInContext(ctx, metadata) diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index e04445044d..c23886dca7 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -1894,16 +1894,16 @@ func TestCheckBulkPermissions(t *testing.T) { { name: "chunking test", requests: (func() []string { - toReturn := make([]string, 0, datastore.FilterMaximumIDCount+5) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + toReturn := make([]string, 0, defaultFilterMaximumIDCountForTest+5) + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i)) } return toReturn })(), response: (func() []bulkCheckTest { - toReturn := make([]bulkCheckTest, 0, datastore.FilterMaximumIDCount+5) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + toReturn := make([]bulkCheckTest, 0, defaultFilterMaximumIDCountForTest+5) + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, bulkCheckTest{ req: fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i), resp: v1.CheckPermissionResponse_PERMISSIONSHIP_NO_PERMISSION, @@ -1917,23 +1917,23 @@ func TestCheckBulkPermissions(t *testing.T) { { name: "chunking test with errors", requests: (func() []string { - toReturn := make([]string, 0, datastore.FilterMaximumIDCount+6) + toReturn := make([]string, 0, defaultFilterMaximumIDCountForTest+6) toReturn = append(toReturn, `nondoc:masterplan#view@user:eng_lead`) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i)) } return toReturn })(), response: (func() []bulkCheckTest { - toReturn := make([]bulkCheckTest, 0, datastore.FilterMaximumIDCount+6) + toReturn := make([]bulkCheckTest, 0, defaultFilterMaximumIDCountForTest+6) toReturn = append(toReturn, bulkCheckTest{ req: `nondoc:masterplan#view@user:eng_lead`, err: namespace.NewNamespaceNotFoundErr("nondoc"), }) - for i := 0; i < int(datastore.FilterMaximumIDCount+5); i++ { + for i := 0; i < int(defaultFilterMaximumIDCountForTest+5); i++ { toReturn = append(toReturn, bulkCheckTest{ req: fmt.Sprintf(`document:masterplan-%d#view@user:eng_lead`, i), resp: v1.CheckPermissionResponse_PERMISSIONSHIP_NO_PERMISSION, diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index 0be7131154..2202ab9e56 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -56,6 +56,9 @@ type PermissionsServerConfig struct { // to the permissions server. MaximumAPIDepth uint32 + // DispatchChunkSize is the maximum number of elements to dispach in a dispatch call + DispatchChunkSize uint16 + // StreamingAPITimeout is the timeout for streaming APIs when no response has been // recently received. StreamingAPITimeout time.Duration @@ -112,6 +115,7 @@ func NewPermissionsServer( MaxLookupResourcesLimit: defaultIfZero(config.MaxLookupResourcesLimit, 1_000), MaxBulkExportRelationshipsLimit: defaultIfZero(config.MaxBulkExportRelationshipsLimit, 100_000), UseExperimentalLookupResources2: config.UseExperimentalLookupResources2, + DispatchChunkSize: defaultIfZero(config.DispatchChunkSize, 100), } return &permissionServer{ @@ -135,6 +139,7 @@ func NewPermissionsServer( maxCaveatContextSize: configWithDefaults.MaxCaveatContextSize, maxConcurrency: configWithDefaults.MaxCheckBulkConcurrency, dispatch: dispatch, + dispatchChunkSize: configWithDefaults.DispatchChunkSize, }, } } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 5b311c0eda..5c1b151994 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -74,7 +74,7 @@ func NewTestServerWithConfigAndDatastore(require *require.Assertions, ctx, cancel := context.WithCancel(context.Background()) srv, err := server.NewConfigWithOptions( server.WithDatastore(ds), - server.WithDispatcher(graph.NewLocalOnlyDispatcher(10)), + server.WithDispatcher(graph.NewLocalOnlyDispatcher(10, 100)), server.WithDispatchMaxDepth(50), server.WithMaximumPreconditionCount(config.MaxPreconditionsCount), server.WithMaximumUpdatesPerWrite(config.MaxUpdatesPerWrite), diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index a7235ff61a..bb61aea05c 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -102,6 +102,7 @@ type Config struct { RevisionQuantization time.Duration `debugmap:"visible"` MaxRevisionStalenessPercent float64 `debugmap:"visible"` CredentialsProviderName string `debugmap:"visible"` + FilterMaximumIDCount uint16 `debugmap:"hidden"` // Options ReadConnPool ConnPoolConfig `debugmap:"visible"` @@ -286,6 +287,7 @@ func DefaultDatastoreConfig() *Config { FollowerReadDelay: 4_800 * time.Millisecond, SpannerMinSessions: 100, SpannerMaxSessions: 400, + FilterMaximumIDCount: 100, } } @@ -412,6 +414,7 @@ func newCRDBDatastore(ctx context.Context, opts Config) (datastore.Datastore, er crdb.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), crdb.WithEnableConnectionBalancing(opts.EnableConnectionBalancing), crdb.ConnectRate(opts.ConnectRate), + crdb.FilterMaximumIDCount(opts.FilterMaximumIDCount), ) } @@ -442,6 +445,7 @@ func commonPostgresDatastoreOptions(opts Config) []postgres.Option { postgres.EnableTracing(), postgres.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), postgres.MaxRetries(uint8(opts.MaxRetries)), + postgres.FilterMaximumIDCount(opts.FilterMaximumIDCount), } } @@ -513,6 +517,7 @@ func newSpannerDatastore(ctx context.Context, opts Config) (datastore.Datastore, spanner.MinSessionCount(opts.SpannerMinSessions), spanner.MaxSessionCount(opts.SpannerMaxSessions), spanner.MigrationPhase(opts.MigrationPhase), + spanner.FilterMaximumIDCount(opts.FilterMaximumIDCount), ) } @@ -546,6 +551,7 @@ func commonMySQLDatastoreOptions(opts Config) []mysql.Option { mysql.WithEnablePrometheusStats(opts.EnableDatastoreMetrics), mysql.MaxRevisionStalenessPercent(opts.MaxRevisionStalenessPercent), mysql.RevisionQuantization(opts.RevisionQuantization), + mysql.FilterMaximumIDCount(opts.FilterMaximumIDCount), } } diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index 779180dd58..2601957ed7 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -38,6 +38,7 @@ func (c *Config) ToOption() ConfigOption { to.RevisionQuantization = c.RevisionQuantization to.MaxRevisionStalenessPercent = c.MaxRevisionStalenessPercent to.CredentialsProviderName = c.CredentialsProviderName + to.FilterMaximumIDCount = c.FilterMaximumIDCount to.ReadConnPool = c.ReadConnPool to.WriteConnPool = c.WriteConnPool to.ReadOnly = c.ReadOnly @@ -185,6 +186,13 @@ func WithCredentialsProviderName(credentialsProviderName string) ConfigOption { } } +// WithFilterMaximumIDCount returns an option that can set FilterMaximumIDCount on a Config +func WithFilterMaximumIDCount(filterMaximumIDCount uint16) ConfigOption { + return func(c *Config) { + c.FilterMaximumIDCount = filterMaximumIDCount + } +} + // WithReadConnPool returns an option that can set ReadConnPool on a Config func WithReadConnPool(readConnPool ConnPoolConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index bfc705260c..1aedf2ca73 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -105,6 +105,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { server.RegisterCacheFlags(cmd.Flags(), "dispatch-cluster-cache", &config.ClusterDispatchCacheConfig, dispatchClusterCacheDefaults) // Flags for configuring dispatch requests + cmd.Flags().Uint16Var(&config.DispatchChunkSize, "dispatch-chunk-size", 100, "maximum number of object IDs in a dispatched request") cmd.Flags().Uint32Var(&config.DispatchMaxDepth, "dispatch-max-depth", 50, "maximum recursion depth for nested calls") cmd.Flags().StringVar(&config.DispatchUpstreamAddr, "dispatch-upstream-addr", "", "upstream grpc address to dispatch to") cmd.Flags().StringVar(&config.DispatchUpstreamCAPath, "dispatch-upstream-ca-path", "", "local path to the TLS CA used when connecting to the dispatch cluster") diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 3abf6049f0..88262f3a05 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -96,6 +96,7 @@ type Config struct { Dispatcher dispatch.Dispatcher `debugmap:"visible"` DispatchHashringReplicationFactor uint16 `debugmap:"visible"` DispatchHashringSpread uint8 `debugmap:"visible"` + DispatchChunkSize uint16 `debugmap:"visible" default:"100"` DispatchSecondaryUpstreamAddrs map[string]string `debugmap:"visible"` DispatchSecondaryUpstreamExprs map[string]string `debugmap:"visible"` @@ -218,7 +219,10 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { ds := c.Datastore if ds == nil { var err error - ds, err = datastorecfg.NewDatastore(context.Background(), c.DatastoreConfig.ToOption()) + ds, err = datastorecfg.NewDatastore(context.Background(), c.DatastoreConfig.ToOption(), + // Datastore's filter maximum ID count is set to the max size, since the number of elements to be dispatched + // are at most the number of elements returned from a datastore query + datastorecfg.WithFilterMaximumIDCount(c.DispatchChunkSize)) if err != nil { return nil, spiceerrors.NewTerminationErrorBuilder(fmt.Errorf("failed to create datastore: %w", err)). Component("datastore"). @@ -293,6 +297,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { combineddispatch.PrometheusSubsystem(c.DispatchClientMetricsPrefix), combineddispatch.Cache(cc), combineddispatch.ConcurrencyLimits(concurrencyLimits), + combineddispatch.DispatchChunkSize(c.DispatchChunkSize), ) if err != nil { return nil, fmt.Errorf("failed to create dispatcher: %w", err) @@ -330,6 +335,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { clusterdispatch.Cache(cdcc), clusterdispatch.RemoteDispatchTimeout(c.DispatchUpstreamTimeout), clusterdispatch.ConcurrencyLimits(concurrencyLimits), + clusterdispatch.DispatchChunkSize(c.DispatchChunkSize), ) if err != nil { return nil, fmt.Errorf("failed to configure cluster dispatch: %w", err) @@ -419,6 +425,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { MaxLookupResourcesLimit: c.MaxLookupResourcesLimit, MaxBulkExportRelationshipsLimit: c.MaxBulkExportRelationshipsLimit, UseExperimentalLookupResources2: c.EnableExperimentalLookupResources, + DispatchChunkSize: c.DispatchChunkSize, } healthManager := health.NewHealthManager(dispatcher, ds) diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index b367c97fc2..e070e32b90 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -70,6 +70,7 @@ func (c *Config) ToOption() ConfigOption { to.Dispatcher = c.Dispatcher to.DispatchHashringReplicationFactor = c.DispatchHashringReplicationFactor to.DispatchHashringSpread = c.DispatchHashringSpread + to.DispatchChunkSize = c.DispatchChunkSize to.DispatchSecondaryUpstreamAddrs = c.DispatchSecondaryUpstreamAddrs to.DispatchSecondaryUpstreamExprs = c.DispatchSecondaryUpstreamExprs to.DispatchCacheConfig = c.DispatchCacheConfig @@ -136,6 +137,7 @@ func (c Config) DebugMap() map[string]any { debugMap["Dispatcher"] = helpers.DebugValue(c.Dispatcher, false) debugMap["DispatchHashringReplicationFactor"] = helpers.DebugValue(c.DispatchHashringReplicationFactor, false) debugMap["DispatchHashringSpread"] = helpers.DebugValue(c.DispatchHashringSpread, false) + debugMap["DispatchChunkSize"] = helpers.DebugValue(c.DispatchChunkSize, false) debugMap["DispatchSecondaryUpstreamAddrs"] = helpers.DebugValue(c.DispatchSecondaryUpstreamAddrs, false) debugMap["DispatchSecondaryUpstreamExprs"] = helpers.DebugValue(c.DispatchSecondaryUpstreamExprs, false) debugMap["DispatchCacheConfig"] = helpers.DebugValue(c.DispatchCacheConfig, false) @@ -417,6 +419,13 @@ func WithDispatchHashringSpread(dispatchHashringSpread uint8) ConfigOption { } } +// WithDispatchChunkSize returns an option that can set DispatchChunkSize on a Config +func WithDispatchChunkSize(dispatchChunkSize uint16) ConfigOption { + return func(c *Config) { + c.DispatchChunkSize = dispatchChunkSize + } +} + // WithDispatchSecondaryUpstreamAddrs returns an option that can append DispatchSecondaryUpstreamAddrss to Config.DispatchSecondaryUpstreamAddrs func WithDispatchSecondaryUpstreamAddrs(key string, value string) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/testserver/testserver.go b/pkg/cmd/testserver/testserver.go index 517fadfa0f..78895f2b3a 100644 --- a/pkg/cmd/testserver/testserver.go +++ b/pkg/cmd/testserver/testserver.go @@ -24,7 +24,11 @@ import ( "github.com/authzed/spicedb/pkg/datastore" ) -const maxDepth = 50 +const ( + maxDepth = 50 + defaultConcurrencyLimit = 10 + defaultMaxChunkSize = 100 +) //go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . Config type Config struct { @@ -56,7 +60,7 @@ func (dr datastoreReady) ReadyState(_ context.Context) (datastore.ReadyState, er } func (c *Config) Complete() (RunnableTestServer, error) { - dispatcher := graph.NewLocalOnlyDispatcher(10) + dispatcher := graph.NewLocalOnlyDispatcher(defaultConcurrencyLimit, defaultMaxChunkSize) datastoreMiddleware := pertoken.NewMiddleware(c.LoadConfigs) @@ -78,6 +82,7 @@ func (c *Config) Complete() (RunnableTestServer, error) { MaxDeleteRelationshipsLimit: c.MaxDeleteRelationshipsLimit, MaxLookupResourcesLimit: c.MaxLookupResourcesLimit, MaxBulkExportRelationshipsLimit: c.MaxBulkExportRelationshipsLimit, + DispatchChunkSize: defaultMaxChunkSize, }, 1*time.Second, ) diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 99ba61cf02..3661cfca70 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -41,10 +41,6 @@ func EngineOptions() string { // hand side of a tuple. const Ellipsis = "..." -// FilterMaximumIDCount is the maximum number of resource IDs or subject IDs that can be sent into -// a filter. -const FilterMaximumIDCount uint16 = 100 - // RevisionChanges represents the changes in a single transaction. type RevisionChanges struct { Revision Revision diff --git a/pkg/development/check.go b/pkg/development/check.go index 429238217a..829da7052f 100644 --- a/pkg/development/check.go +++ b/pkg/development/check.go @@ -9,6 +9,8 @@ import ( v1dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" ) +const defaultWasmDispatchChunkSize = 100 + // CheckResult is the result of a RunCheck operation. type CheckResult struct { Permissionship v1dispatch.ResourceCheckResult_Membership @@ -36,6 +38,7 @@ func RunCheck(devContext *DevContext, resource *core.ObjectAndRelation, subject DebugOption: computed.TraceDebuggingEnabled, }, resource.ObjectId, + defaultWasmDispatchChunkSize, ) if err != nil { return CheckResult{v1dispatch.ResourceCheckResult_NOT_MEMBER, nil, nil, nil}, err diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index fcb93a7552..6a7609b741 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -111,7 +111,7 @@ func newDevContextWithDatastore(ctx context.Context, requestContext *devinterfac Datastore: ds, CompiledSchema: compiled, Revision: currentRevision, - Dispatcher: graph.NewLocalOnlyDispatcher(10), + Dispatcher: graph.NewLocalOnlyDispatcher(10, 100), }, nil, nil }