Skip to content

Commit

Permalink
add support for configurable dispatch chunk size
Browse files Browse the repository at this point in the history
we've observed improvements on LR workloads when
increasing the dispatch chunk size
  • Loading branch information
josephschorr authored and vroldanbet committed Jul 24, 2024
1 parent c6e7500 commit f261a34
Show file tree
Hide file tree
Showing 56 changed files with 319 additions and 245 deletions.
10 changes: 1 addition & 9 deletions internal/caveats/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,11 @@ func runExpressionWithCaveats(
var exprStringPieces []string

var currentResult ExpressionResult = syntheticResult{
value: false,
value: cop.Op == core.CaveatOperation_AND,
contextValues: map[string]any{},
exprString: "",
missingContextParams: []string{},
}
if cop.Op == core.CaveatOperation_AND {
currentResult = syntheticResult{
value: true,
contextValues: map[string]any{},
exprString: "",
missingContextParams: []string{},
}
}

buildExprString := func() (string, error) {
if debugOption != RunCaveatExpressionWithDebugInformation {
Expand Down
12 changes: 7 additions & 5 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ type SchemaQueryFilterer struct {
schema SchemaInformation
queryBuilder sq.SelectBuilder
filteringColumnCounts map[string]int
filterMaximumIDCount uint16
}

// NewSchemaQueryFilterer creates a new SchemaQueryFilterer object.
func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder) SchemaQueryFilterer {
func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder, filterMaximumIDCount uint16) SchemaQueryFilterer {
return SchemaQueryFilterer{
schema: schema,
queryBuilder: initialQuery,
filteringColumnCounts: map[string]int{},
filterMaximumIDCount: filterMaximumIDCount,
}
}

Expand Down Expand Up @@ -306,8 +308,8 @@ func (sqf SchemaQueryFilterer) MustFilterWithResourceIDPrefix(prefix string) Sch
// FilterToResourceIDs returns a new SchemaQueryFilterer that is limited to resources with any of the
// specified IDs.
func (sqf SchemaQueryFilterer) FilterToResourceIDs(resourceIds []string) (SchemaQueryFilterer, error) {
if len(resourceIds) > int(datastore.FilterMaximumIDCount) {
return sqf, spiceerrors.MustBugf("cannot have more than %d resources IDs in a single filter", datastore.FilterMaximumIDCount)
if len(resourceIds) > int(sqf.filterMaximumIDCount) {
return sqf, spiceerrors.MustBugf("cannot have more than %d resources IDs in a single filter", sqf.filterMaximumIDCount)
}

var builder strings.Builder
Expand Down Expand Up @@ -422,8 +424,8 @@ func (sqf SchemaQueryFilterer) FilterWithSubjectsSelectors(selectors ...datastor
}

if len(selector.OptionalSubjectIds) > 0 {
if len(selector.OptionalSubjectIds) > int(datastore.FilterMaximumIDCount) {
return sqf, spiceerrors.MustBugf("cannot have more than %d subject IDs in a single filter", datastore.FilterMaximumIDCount)
if len(selector.OptionalSubjectIds) > int(sqf.filterMaximumIDCount) {
return sqf, spiceerrors.MustBugf("cannot have more than %d subject IDs in a single filter", sqf.filterMaximumIDCount)
}

var builder strings.Builder
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/common/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func TestSchemaQueryFilterer(t *testing.T) {
"caveat",
TupleComparison,
)
filterer := NewSchemaQueryFilterer(schema, base)
filterer := NewSchemaQueryFilterer(schema, base, 100)

ran := test.run(filterer)
require.Equal(t, test.expectedColumnCounts, ran.filteringColumnCounts)
Expand All @@ -693,7 +693,7 @@ func BenchmarkSchemaFilterer(b *testing.B) {
"caveat_name",
TupleComparison,
)
sqf := NewSchemaQueryFilterer(si, sq.Select("*"))
sqf := NewSchemaQueryFilterer(si, sq.Select("*"), 100)
var names []string
for i := 0; i < 500; i++ {
names = append(names, uuid.NewString())
Expand Down
10 changes: 6 additions & 4 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ type crdbDatastore struct {

featureGroup singleflight.Group[string, *datastore.Features]

pruneGroup *errgroup.Group
ctx context.Context
cancel context.CancelFunc
pruneGroup *errgroup.Group
ctx context.Context
cancel context.CancelFunc
filterMaximumIDCount uint16
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand All @@ -289,7 +290,7 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade
return query.From(fromStr + " AS OF SYSTEM TIME " + rev.String())
}

return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder}
return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount}
}

func (cds *crdbDatastore) ReadWriteTx(
Expand Down Expand Up @@ -319,6 +320,7 @@ func (cds *crdbDatastore) ReadWriteTx(
func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder {
return query.From(fromStr)
},
cds.filterMaximumIDCount,
},
tx,
0,
Expand Down
11 changes: 9 additions & 2 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type crdbOptions struct {
overlapKey string
enableConnectionBalancing bool
analyzeBeforeStatistics bool

enablePrometheusStats bool
filterMaximumIDCount uint16
enablePrometheusStats bool
}

const (
Expand All @@ -48,6 +48,7 @@ const (
defaultEnablePrometheusStats = false
defaultEnableConnectionBalancing = true
defaultConnectRate = 100 * time.Millisecond
defaultFilterMaximumIDCount = 100
)

// Option provides the facility to configure how clients within the CRDB
Expand All @@ -68,6 +69,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
enablePrometheusStats: defaultEnablePrometheusStats,
enableConnectionBalancing: defaultEnableConnectionBalancing,
connectRate: defaultConnectRate,
filterMaximumIDCount: defaultFilterMaximumIDCount,
}

for _, option := range options {
Expand Down Expand Up @@ -306,3 +308,8 @@ func WithEnableConnectionBalancing(connectionBalancing bool) Option {
func DebugAnalyzeBeforeStatistics() Option {
return func(po *crdbOptions) { po.analyzeBeforeStatistics = true }
}

// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(po *crdbOptions) { po.filterMaximumIDCount = filterMaximumIDCount }
}
17 changes: 9 additions & 8 deletions internal/datastore/crdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ var (
)

type crdbReader struct {
query pgxcommon.DBFuncQuerier
executor common.QueryExecutor
keyer overlapKeyer
overlapKeySet keySet
fromBuilder func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder
query pgxcommon.DBFuncQuerier
executor common.QueryExecutor
keyer overlapKeyer
overlapKeySet keySet
fromBuilder func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder
filterMaximumIDCount uint16
}

func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int, error) {
Expand All @@ -83,7 +84,7 @@ func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int,
}

query := cr.fromBuilder(countTuples, tableTuple)
builder, err := common.NewSchemaQueryFilterer(schema, query).FilterWithRelationshipsFilter(relFilter)
builder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -208,7 +209,7 @@ func (cr *crdbReader) QueryRelationships(
opts ...options.QueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
query := cr.fromBuilder(queryTuples, tableTuple)
qBuilder, err := common.NewSchemaQueryFilterer(schema, query).FilterWithRelationshipsFilter(filter)
qBuilder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount).FilterWithRelationshipsFilter(filter)
if err != nil {
return nil, err
}
Expand All @@ -222,7 +223,7 @@ func (cr *crdbReader) ReverseQueryRelationships(
opts ...options.ReverseQueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
query := cr.fromBuilder(queryTuples, tableTuple)
qBuilder, err := common.NewSchemaQueryFilterer(schema, query).
qBuilder, err := common.NewSchemaQueryFilterer(schema, query, cr.filterMaximumIDCount).
FilterWithSubjectsSelectors(subjectsFilter.AsSelector())
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
createTxFunc,
executor,
buildLivingObjectFilterForRevision(rev),
mds.filterMaximumIDCount,
}
}

Expand Down Expand Up @@ -356,6 +357,7 @@ func (mds *Datastore) ReadWriteTx(
longLivedTx,
executor,
currentlyLivingObjects,
mds.filterMaximumIDCount,
},
mds.driver.RelationTuple(),
tx,
Expand Down Expand Up @@ -484,6 +486,7 @@ type Datastore struct {
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
maxRetries uint8
filterMaximumIDCount uint16

optimizedRevisionQuery string
validTransactionQuery string
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/mysql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
defaultMaxRetries = 8
defaultGCEnabled = true
defaultCredentialsProviderName = ""
defaultFilterMaximumIDCount = 100
)

type mysqlOptions struct {
Expand All @@ -42,6 +43,7 @@ type mysqlOptions struct {
lockWaitTimeoutSeconds *uint8
gcEnabled bool
credentialsProviderName string
filterMaximumIDCount uint16
}

// Option provides the facility to configure how clients within the
Expand All @@ -64,6 +66,7 @@ func generateConfig(options []Option) (mysqlOptions, error) {
maxRetries: defaultMaxRetries,
gcEnabled: defaultGCEnabled,
credentialsProviderName: defaultCredentialsProviderName,
filterMaximumIDCount: defaultFilterMaximumIDCount,
}

for _, option := range options {
Expand Down Expand Up @@ -247,3 +250,8 @@ func GCMaxOperationTime(time time.Duration) Option {
func CredentialsProviderName(credentialsProviderName string) Option {
return func(mo *mysqlOptions) { mo.credentialsProviderName = credentialsProviderName }
}

// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(mo *mysqlOptions) { mo.filterMaximumIDCount = filterMaximumIDCount }
}
13 changes: 7 additions & 6 deletions internal/datastore/mysql/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ type txFactory func(context.Context) (*sql.Tx, txCleanupFunc, error)
type mysqlReader struct {
*QueryBuilder

txSource txFactory
executor common.QueryExecutor
filterer queryFilterer
txSource txFactory
executor common.QueryExecutor
filterer queryFilterer
filterMaximumIDCount uint16
}

type queryFilterer func(original sq.SelectBuilder) sq.SelectBuilder
Expand Down Expand Up @@ -65,7 +66,7 @@ func (mr *mysqlReader) CountRelationships(ctx context.Context, name string) (int
return 0, err
}

qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.CountTupleQuery)).FilterWithRelationshipsFilter(relFilter)
qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.CountTupleQuery), mr.filterMaximumIDCount).FilterWithRelationshipsFilter(relFilter)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -174,7 +175,7 @@ func (mr *mysqlReader) QueryRelationships(
filter datastore.RelationshipsFilter,
opts ...options.QueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery)).FilterWithRelationshipsFilter(filter)
qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery), mr.filterMaximumIDCount).FilterWithRelationshipsFilter(filter)
if err != nil {
return nil, err
}
Expand All @@ -187,7 +188,7 @@ func (mr *mysqlReader) ReverseQueryRelationships(
subjectsFilter datastore.SubjectsFilter,
opts ...options.ReverseQueryOptionsOption,
) (iter datastore.RelationshipIterator, err error) {
qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery)).
qBuilder, err := common.NewSchemaQueryFilterer(schema, mr.filterer(mr.QueryTuplesQuery), mr.filterMaximumIDCount).
FilterWithSubjectsSelectors(subjectsFilter.AsSelector())
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type postgresOptions struct {
gcInterval time.Duration
gcMaxOperationTime time.Duration
maxRetries uint8
filterMaximumIDCount uint16

enablePrometheusStats bool
analyzeBeforeStatistics bool
Expand Down Expand Up @@ -63,6 +64,7 @@ const (
defaultGCEnabled = true
defaultCredentialsProviderName = ""
defaultReadStrictMode = false
defaultFilterMaximumIDCount = 100
)

// Option provides the facility to configure how clients within the
Expand All @@ -84,6 +86,7 @@ func generateConfig(options []Option) (postgresOptions, error) {
credentialsProviderName: defaultCredentialsProviderName,
readStrictMode: defaultReadStrictMode,
queryInterceptor: nil,
filterMaximumIDCount: defaultFilterMaximumIDCount,
}

for _, option := range options {
Expand Down Expand Up @@ -356,3 +359,8 @@ func MigrationPhase(phase string) Option {
func CredentialsProviderName(credentialsProviderName string) Option {
return func(po *postgresOptions) { po.credentialsProviderName = credentialsProviderName }
}

// FilterMaximumIDCount is the maximum number of IDs that can be used to filter IDs in queries
func FilterMaximumIDCount(filterMaximumIDCount uint16) Option {
return func(po *postgresOptions) { po.filterMaximumIDCount = filterMaximumIDCount }
}
12 changes: 8 additions & 4 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func newPostgresDatastore(
credentialsProvider: credentialsProvider,
isPrimary: isPrimary,
inStrictReadMode: config.readStrictMode,
filterMaximumIDCount: config.filterMaximumIDCount,
}

if isPrimary && config.readStrictMode {
Expand Down Expand Up @@ -384,10 +385,11 @@ type pgDatastore struct {

credentialsProvider datastore.CredentialsProvider

gcGroup *errgroup.Group
gcCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
gcGroup *errgroup.Group
gcCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
filterMaximumIDCount uint16
}

func (pgd *pgDatastore) IsStrictReadModeEnabled() bool {
Expand All @@ -410,6 +412,7 @@ func (pgd *pgDatastore) SnapshotReader(revRaw datastore.Revision) datastore.Read
queryFuncs,
executor,
buildLivingObjectFilterForRevision(rev),
pgd.filterMaximumIDCount,
}
}

Expand Down Expand Up @@ -447,6 +450,7 @@ func (pgd *pgDatastore) ReadWriteTx(
queryFuncs,
executor,
currentlyLivingObjects,
pgd.filterMaximumIDCount,
},
tx,
newXID,
Expand Down
Loading

0 comments on commit f261a34

Please sign in to comment.