Skip to content

Commit

Permalink
Remove unused usersets splitting from datastore interface
Browse files Browse the repository at this point in the history
Fixes #1411
  • Loading branch information
josephschorr committed Jul 18, 2023
1 parent e2bfef4 commit c16cb90
Show file tree
Hide file tree
Showing 22 changed files with 72 additions and 394 deletions.
80 changes: 20 additions & 60 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,51 +478,25 @@ func (sqf SchemaQueryFilterer) FilterWithCaveatName(caveatName string) SchemaQue
return sqf
}

// FilterToUsersets returns a new SchemaQueryFilterer that is limited to resources with subjects
// in the specified list of usersets. Nil or empty usersets parameter does not affect the underlying
// query.
func (sqf SchemaQueryFilterer) filterToUsersets(usersets []*core.ObjectAndRelation) SchemaQueryFilterer {
if len(usersets) == 0 {
return sqf
}

orClause := sq.Or{}
for _, userset := range usersets {
orClause = append(orClause, sq.Eq{
sqf.schema.colUsersetNamespace: userset.Namespace,
sqf.schema.colUsersetObjectID: userset.ObjectId,
sqf.schema.colUsersetRelation: userset.Relation,
})
sqf.recordColumnValue(sqf.schema.colUsersetNamespace)
sqf.recordColumnValue(sqf.schema.colUsersetObjectID)
sqf.recordColumnValue(sqf.schema.colUsersetRelation)
}

sqf.queryBuilder = sqf.queryBuilder.Where(orClause)
return sqf
}

// Limit returns a new SchemaQueryFilterer which is limited to the specified number of results.
func (sqf SchemaQueryFilterer) limit(limit uint64) SchemaQueryFilterer {
sqf.queryBuilder = sqf.queryBuilder.Limit(limit)
sqf.tracerAttributes = append(sqf.tracerAttributes, limitKey.Int64(int64(limit)))
return sqf
}

// TupleQuerySplitter is a tuple query runner shared by SQL implementations of the datastore.
type TupleQuerySplitter struct {
Executor ExecuteQueryFunc
UsersetBatchSize uint16
// QueryExecutor is a tuple query runner shared by SQL implementations of the datastore.
type QueryExecutor struct {
Executor ExecuteQueryFunc
}

// SplitAndExecuteQuery is used to split up the usersets in a very large query and execute
// them as separate queries.
func (tqs TupleQuerySplitter) SplitAndExecuteQuery(
// ExecuteQuery executes the query.
func (tqs QueryExecutor) ExecuteQuery(
ctx context.Context,
query SchemaQueryFilterer,
opts ...options.QueryOptionsOption,
) (datastore.RelationshipIterator, error) {
ctx, span := tracer.Start(ctx, "SplitAndExecuteQuery")
ctx, span := tracer.Start(ctx, "ExecuteQuery")
defer span.End()
queryOpts := options.NewQueryOptionsWithOptions(opts...)

Expand All @@ -536,41 +510,27 @@ func (tqs TupleQuerySplitter) SplitAndExecuteQuery(
query = query.After(queryOpts.After, queryOpts.Sort)
}

var tuples []*core.RelationTuple
remainingLimit := math.MaxInt
limit := math.MaxInt
if queryOpts.Limit != nil {
remainingLimit = int(*queryOpts.Limit)
limit = int(*queryOpts.Limit)
}

remainingUsersets := queryOpts.Usersets
for remaining := 1; remaining > 0; remaining = len(remainingUsersets) {
upperBound := uint16(len(remainingUsersets))
if upperBound > tqs.UsersetBatchSize {
upperBound = tqs.UsersetBatchSize
}

batch := remainingUsersets[:upperBound]
toExecute := query.limit(uint64(remainingLimit)).filterToUsersets(batch)

sql, args, err := toExecute.queryBuilder.ToSql()
if err != nil {
return nil, err
}

queryTuples, err := tqs.Executor(ctx, sql, args)
if err != nil {
return nil, err
}
toExecute := query.limit(uint64(limit))
sql, args, err := toExecute.queryBuilder.ToSql()
if err != nil {
return nil, err
}

if len(queryTuples) > remainingLimit {
queryTuples = queryTuples[:remainingLimit]
}
queryTuples, err := tqs.Executor(ctx, sql, args)
if err != nil {
return nil, err
}

tuples = append(tuples, queryTuples...)
remainingUsersets = remainingUsersets[upperBound:]
if len(queryTuples) > limit {
queryTuples = queryTuples[:limit]
}

return NewSliceRelationshipIterator(tuples, queryOpts.Sort), nil
return NewSliceRelationshipIterator(queryTuples, queryOpts.Sort), nil
}

// ExecuteQueryFunc is a function that can be used to execute a single rendered SQL query.
Expand Down
26 changes: 0 additions & 26 deletions internal/datastore/common/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

func TestSchemaQueryFilterer(t *testing.T) {
Expand Down Expand Up @@ -403,31 +402,6 @@ func TestSchemaQueryFilterer(t *testing.T) {
"subject_relation": 1,
},
},
{
"empty filterToUsersets",
func(filterer SchemaQueryFilterer) SchemaQueryFilterer {
return filterer.filterToUsersets(nil)
},
"SELECT *",
nil,
map[string]int{},
},
{
"filterToUsersets",
func(filterer SchemaQueryFilterer) SchemaQueryFilterer {
return filterer.filterToUsersets([]*core.ObjectAndRelation{
tuple.ParseONR("document:foo#somerel"),
tuple.ParseONR("team:bar#member"),
})
},
"SELECT * WHERE (subject_ns = ? AND subject_object_id = ? AND subject_relation = ? OR subject_ns = ? AND subject_object_id = ? AND subject_relation = ?)",
[]any{"document", "foo", "somerel", "team", "bar", "member"},
map[string]int{
"subject_ns": 2,
"subject_object_id": 2,
"subject_relation": 2,
},
},
{
"limit",
func(filterer SchemaQueryFilterer) SchemaQueryFilterer {
Expand Down
16 changes: 6 additions & 10 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func newCRDBDatastore(url string, options ...Option) (datastore.Datastore, error
watchBufferLength: config.watchBufferLength,
writeOverlapKeyer: keyer,
overlapKeyInit: keySetInit,
usersetBatchSize: config.splitAtUsersetCount,
disableStats: config.disableStats,
beginChangefeedQuery: changefeedQuery,
}
Expand Down Expand Up @@ -254,7 +253,6 @@ type crdbDatastore struct {
watchBufferLength uint16
writeOverlapKeyer overlapKeyer
overlapKeyInit func(ctx context.Context) keySet
usersetBatchSize uint16
disableStats bool

beginChangefeedQuery string
Expand All @@ -265,16 +263,15 @@ type crdbDatastore struct {
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(cds.readPool),
UsersetBatchSize: cds.usersetBatchSize,
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(cds.readPool),
}

fromBuilder := func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder {
return query.From(fromStr + " AS OF SYSTEM TIME " + rev.String())
}

return &crdbReader{cds.readPool, querySplitter, noOverlapKeyer, nil, fromBuilder}
return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder}
}

func (cds *crdbDatastore) ReadWriteTx(
Expand All @@ -291,15 +288,14 @@ func (cds *crdbDatastore) ReadWriteTx(

err := cds.writePool.BeginFunc(ctx, func(tx pgx.Tx) error {
querier := pgxcommon.QuerierFuncsFor(tx)
querySplitter := common.TupleQuerySplitter{
Executor: pgxcommon.NewPGXExecutor(querier),
UsersetBatchSize: cds.usersetBatchSize,
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(querier),
}

rwt := &crdbReadWriteTXN{
&crdbReader{
querier,
querySplitter,
executor,
cds.writeOverlapKeyer,
cds.overlapKeyInit(ctx),
func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder {
Expand Down
12 changes: 0 additions & 12 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type crdbOptions struct {
maxRevisionStalenessPercent float64
gcWindow time.Duration
maxRetries uint8
splitAtUsersetCount uint16
overlapStrategy string
overlapKey string
disableStats bool
Expand Down Expand Up @@ -60,7 +59,6 @@ func generateConfig(options []Option) (crdbOptions, error) {
revisionQuantization: defaultRevisionQuantization,
followerReadDelay: defaultFollowerReadDelay,
maxRevisionStalenessPercent: defaultMaxRevisionStalenessPercent,
splitAtUsersetCount: defaultSplitSize,
maxRetries: defaultMaxRetries,
overlapKey: defaultOverlapKey,
overlapStrategy: defaultOverlapStrategy,
Expand All @@ -86,16 +84,6 @@ func generateConfig(options []Option) (crdbOptions, error) {
return computed, nil
}

// SplitAtUsersetCount is the batch size for which userset queries will be
// split into smaller queries.
//
// This defaults to 1024.
func SplitAtUsersetCount(splitAtUsersetCount uint16) Option {
return func(po *crdbOptions) {
po.splitAtUsersetCount = splitAtUsersetCount
}
}

// ReadConnHealthCheckInterval is the frequency at which both idle and max
// lifetime connections are checked, and also the frequency at which the
// minimum number of connections is checked.
Expand Down
6 changes: 3 additions & 3 deletions internal/datastore/crdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

type crdbReader struct {
query pgxcommon.DBFuncQuerier
querySplitter common.TupleQuerySplitter
executor common.QueryExecutor
keyer overlapKeyer
overlapKeySet keySet
fromBuilder func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder
Expand Down Expand Up @@ -100,7 +100,7 @@ func (cr *crdbReader) QueryRelationships(
return nil, err
}

return cr.querySplitter.SplitAndExecuteQuery(ctx, qBuilder, opts...)
return cr.executor.ExecuteQuery(ctx, qBuilder, opts...)
}

func (cr *crdbReader) ReverseQueryRelationships(
Expand All @@ -123,7 +123,7 @@ func (cr *crdbReader) ReverseQueryRelationships(
FilterToRelation(queryOpts.ResRelation.Relation)
}

return cr.querySplitter.SplitAndExecuteQuery(
return cr.executor.ExecuteQuery(
ctx,
qBuilder,
options.WithLimit(queryOpts.LimitForReverse),
Expand Down
16 changes: 0 additions & 16 deletions internal/datastore/memdb/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (r *memdbReader) QueryRelationships(
filter.OptionalResourceRelation,
filter.OptionalSubjectsSelectors,
filter.OptionalCaveatName,
queryOpts.Usersets,
makeCursorFilterFn(queryOpts.After, queryOpts.Sort),
)
filteredIterator := memdb.NewFilterIterator(bestIterator, matchingRelationshipsFilterFunc)
Expand Down Expand Up @@ -128,7 +127,6 @@ func (r *memdbReader) ReverseQueryRelationships(
filterRelation,
[]datastore.SubjectsSelector{subjectsFilter.AsSelector()},
"",
nil,
makeCursorFilterFn(queryOpts.AfterForReverse, queryOpts.SortForReverse),
)
filteredIterator := memdb.NewFilterIterator(iterator, matchingRelationshipsFilterFunc)
Expand Down Expand Up @@ -284,7 +282,6 @@ func filterFuncForFilters(
optionalRelation string,
optionalSubjectsSelectors []datastore.SubjectsSelector,
optionalCaveatFilter string,
usersets []*core.ObjectAndRelation,
cursorFilter func(*relationship) bool,
) memdb.FilterFunc {
return func(tupleRaw interface{}) bool {
Expand Down Expand Up @@ -339,19 +336,6 @@ func filterFuncForFilters(
}
}

if len(usersets) > 0 {
found := false
for _, filter := range usersets {
if filter.Namespace == tuple.subjectNamespace &&
filter.ObjectId == tuple.subjectObjectID &&
filter.Relation == tuple.subjectRelation {
found = true
break
}
}
return !found
}

return cursorFilter(tuple)
}
}
Expand Down
16 changes: 6 additions & 10 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func newMySQLDatastore(uri string, options ...Option) (*Datastore, error) {
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
usersetBatchSize: config.splitAtUsersetCount,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
createTxn: createTxn,
Expand Down Expand Up @@ -262,15 +261,14 @@ func (mds *Datastore) SnapshotReader(revisionRaw datastore.Revision) datastore.R
return tx, tx.Rollback, nil
}

querySplitter := common.TupleQuerySplitter{
Executor: newMySQLExecutor(mds.db),
UsersetBatchSize: mds.usersetBatchSize,
executor := common.QueryExecutor{
Executor: newMySQLExecutor(mds.db),
}

return &mysqlReader{
mds.QueryBuilder,
createTxFunc,
querySplitter,
executor,
buildLivingObjectFilterForRevision(rev),
}
}
Expand Down Expand Up @@ -300,16 +298,15 @@ func (mds *Datastore) ReadWriteTx(
return tx, noCleanup, nil
}

querySplitter := common.TupleQuerySplitter{
Executor: newMySQLExecutor(tx),
UsersetBatchSize: mds.usersetBatchSize,
executor := common.QueryExecutor{
Executor: newMySQLExecutor(tx),
}

rwt := &mysqlReadWriteTXN{
&mysqlReader{
mds.QueryBuilder,
longLivedTx,
querySplitter,
executor,
currentlyLivingObjects,
},
mds.driver.RelationTuple(),
Expand Down Expand Up @@ -426,7 +423,6 @@ type Datastore struct {
gcInterval time.Duration
gcTimeout time.Duration
watchBufferLength uint16
usersetBatchSize uint16
maxRetries uint8

optimizedRevisionQuery string
Expand Down
Loading

0 comments on commit c16cb90

Please sign in to comment.