diff --git a/internal/datastore/crdb/readwrite.go b/internal/datastore/crdb/readwrite.go index c47c47b857..3d30967402 100644 --- a/internal/datastore/crdb/readwrite.go +++ b/internal/datastore/crdb/readwrite.go @@ -14,6 +14,7 @@ import ( pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) @@ -96,6 +97,10 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [ bulkTouch := queryTouchTuple var bulkTouchCount int64 + bulkDelete := queryDeleteTuples + bulkDeleteOr := sq.Or{} + var bulkDeleteCount int64 + // Process the actual updates for _, mutation := range mutations { rel := mutation.Tuple @@ -139,20 +144,27 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [ bulkWriteCount++ case core.RelationTupleUpdate_DELETE: rwt.relCountChange-- - sql, args, err := queryDeleteTuples.Where(exactRelationshipClause(rel)).ToSql() - if err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) - } + bulkDeleteOr = append(bulkDeleteOr, exactRelationshipClause(rel)) + bulkDeleteCount++ - if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) - } default: log.Ctx(ctx).Error().Stringer("operation", mutation.Operation).Msg("unknown operation type") return fmt.Errorf("unknown mutation operation: %s", mutation.Operation) } } + if bulkDeleteCount > 0 { + bulkDelete = bulkDelete.Where(bulkDeleteOr) + sql, args, err := bulkDelete.ToSql() + if err != nil { + return fmt.Errorf(errUnableToWriteRelationships, err) + } + + if _, err := rwt.tx.Exec(ctx, sql, args...); err != nil { + return fmt.Errorf(errUnableToWriteRelationships, err) + } + } + bulkUpdateQueries := make([]sq.InsertBuilder, 0, 2) if bulkWriteCount > 0 { bulkUpdateQueries = append(bulkUpdateQueries, bulkWrite) @@ -186,7 +198,7 @@ func exactRelationshipClause(r *core.RelationTuple) sq.Eq { } } -func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { // Add clauses for the ResourceFilter query := queryDeleteTuples.Where(sq.Eq{colNamespace: filter.ResourceType}) if filter.OptionalResourceId != "" { @@ -208,19 +220,34 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1 } rwt.addOverlapKey(subjectFilter.SubjectType) } + + // Add the limit, if any. + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + var delLimit uint64 + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + delLimit = *delOpts.DeleteLimit + } + + if delLimit > 0 { + query = query.Limit(delLimit) + } + sql, args, err := query.ToSql() if err != nil { - return fmt.Errorf(errUnableToDeleteRelationships, err) + return false, fmt.Errorf(errUnableToDeleteRelationships, err) } modified, err := rwt.tx.Exec(ctx, sql, args...) if err != nil { - return fmt.Errorf(errUnableToDeleteRelationships, err) + return false, fmt.Errorf(errUnableToDeleteRelationships, err) } rwt.relCountChange -= modified.RowsAffected() + if delLimit > 0 && uint64(modified.RowsAffected()) == delLimit { + return true, nil + } - return nil + return false, nil } func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error { diff --git a/internal/datastore/memdb/readwrite.go b/internal/datastore/memdb/readwrite.go index 0904e6d859..a3ff3088e9 100644 --- a/internal/datastore/memdb/readwrite.go +++ b/internal/datastore/memdb/readwrite.go @@ -11,6 +11,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" ) @@ -117,38 +118,53 @@ func (rwt *memdbReadWriteTx) toCaveatReference(mutation *core.RelationTupleUpdat return cr } -func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error { +func (rwt *memdbReadWriteTx) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { rwt.mustLock() defer rwt.Unlock() tx, err := rwt.txSource() if err != nil { - return err + return false, err + } + + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + var delLimit uint64 + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + delLimit = *delOpts.DeleteLimit } - return rwt.deleteWithLock(tx, filter) + return rwt.deleteWithLock(tx, filter, delLimit) } // caller must already hold the concurrent access lock -func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter) error { +func (rwt *memdbReadWriteTx) deleteWithLock(tx *memdb.Txn, filter *v1.RelationshipFilter, limit uint64) (bool, error) { // Create an iterator to find the relevant tuples bestIter, err := iteratorForFilter(tx, datastore.RelationshipsFilterFromPublicFilter(filter)) if err != nil { - return err + return false, err } filteredIter := memdb.NewFilterIterator(bestIter, relationshipFilterFilterFunc(filter)) // Collect the tuples into a slice of mutations for the changelog var mutations []*core.RelationTupleUpdate + var counter uint64 + + metLimit := false for row := filteredIter.Next(); row != nil; row = filteredIter.Next() { rt, err := row.(*relationship).RelationTuple() if err != nil { - return err + return false, err } mutations = append(mutations, tuple.Delete(rt)) + counter++ + + if limit > 0 && counter == limit { + metLimit = true + break + } } - return rwt.write(tx, mutations...) + return metLimit, rwt.write(tx, mutations...) } func (rwt *memdbReadWriteTx) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error { @@ -201,9 +217,9 @@ func (rwt *memdbReadWriteTx) DeleteNamespaces(_ context.Context, nsNames ...stri } // Delete the relationships from the namespace - if err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{ + if _, err := rwt.deleteWithLock(tx, &v1.RelationshipFilter{ ResourceType: nsName, - }); err != nil { + }, 0); err != nil { return fmt.Errorf("unable to delete relationships from deleted namespace: %w", err) } } diff --git a/internal/datastore/mysql/readwrite.go b/internal/datastore/mysql/readwrite.go index 411e150e5d..1c075bd229 100644 --- a/internal/datastore/mysql/readwrite.go +++ b/internal/datastore/mysql/readwrite.go @@ -20,6 +20,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" @@ -213,7 +214,7 @@ func (rwt *mysqlReadWriteTXN) WriteRelationships(ctx context.Context, mutations return nil } -func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { // Add clauses for the ResourceFilter query := rwt.DeleteTupleQuery.Where(sq.Eq{colNamespace: filter.ResourceType}) if filter.OptionalResourceId != "" { @@ -236,16 +237,37 @@ func (rwt *mysqlReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v query = query.Set(colDeletedTxn, rwt.newTxnID) + // Add the limit, if any. + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + var delLimit uint64 + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + delLimit = *delOpts.DeleteLimit + } + + if delLimit > 0 { + query = query.Limit(delLimit) + } + querySQL, args, err := query.ToSql() if err != nil { - return fmt.Errorf(errUnableToDeleteRelationships, err) + return false, fmt.Errorf(errUnableToDeleteRelationships, err) } - if _, err := rwt.tx.ExecContext(ctx, querySQL, args...); err != nil { - return fmt.Errorf(errUnableToDeleteRelationships, err) + modified, err := rwt.tx.ExecContext(ctx, querySQL, args...) + if err != nil { + return false, fmt.Errorf(errUnableToDeleteRelationships, err) } - return nil + rowsAffected, err := modified.RowsAffected() + if err != nil { + return false, fmt.Errorf(errUnableToDeleteRelationships, err) + } + + if delLimit > 0 && uint64(rowsAffected) == delLimit { + return true, nil + } + + return false, nil } func (rwt *mysqlReadWriteTXN) WriteNamespaces(ctx context.Context, newNamespaces ...*core.NamespaceDefinition) error { diff --git a/internal/datastore/postgres/readwrite.go b/internal/datastore/postgres/readwrite.go index f473b2984d..64ffb9444a 100644 --- a/internal/datastore/postgres/readwrite.go +++ b/internal/datastore/postgres/readwrite.go @@ -5,10 +5,9 @@ import ( "errors" "fmt" + "github.com/authzed/spicedb/pkg/datastore/options" "github.com/authzed/spicedb/pkg/spiceerrors" - "github.com/authzed/spicedb/pkg/tuple" - sq "github.com/Masterminds/squirrel" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/jackc/pgx/v5" @@ -18,6 +17,7 @@ import ( pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" ) const ( @@ -48,7 +48,15 @@ var ( colCaveatContext, ) - deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + selectForDelete = psql.Select( + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, + ).From(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) ) type pgReadWriteTXN struct { @@ -269,7 +277,71 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []* return nil } -func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (rwt *pgReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + return rwt.deleteRelationshipsWithLimit(ctx, filter, *delOpts.DeleteLimit) + } + + return false, rwt.deleteRelationships(ctx, filter) +} + +func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, filter *v1.RelationshipFilter, limit uint64) (bool, error) { + // Construct a select query for the relationships to be removed. + query := selectForDelete.Where(sq.Eq{colNamespace: filter.ResourceType}) + if filter.OptionalResourceId != "" { + query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) + } + if filter.OptionalRelation != "" { + query = query.Where(sq.Eq{colRelation: filter.OptionalRelation}) + } + + // Add clauses for the SubjectFilter + if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { + query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) + if subjectFilter.OptionalSubjectId != "" { + query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) + } + if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil { + query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) + } + } + + query = query.Limit(limit) + + selectSQL, args, err := query.ToSql() + if err != nil { + return false, fmt.Errorf(errUnableToDeleteRelationships, err) + } + + args = append(args, rwt.newXID) + if len(args) != 3 { + return false, spiceerrors.MustBugf("expected 3 arguments, got %d", len(args)) + } + + // Construct a CTE to update the relationships as removed. + cteSQL := fmt.Sprintf( + "WITH found_tuples AS (%s)\nUPDATE %s SET %s = $3 WHERE (%s, %s, %s, %s, %s, %s) IN (select * from found_tuples)", + selectSQL, + tableTuple, + colDeletedXid, + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, + ) + + result, err := rwt.tx.Exec(ctx, cteSQL, args...) + if err != nil { + return false, fmt.Errorf(errUnableToDeleteRelationships, err) + } + + return result.RowsAffected() == int64(limit), nil +} + +func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { // Add clauses for the ResourceFilter query := deleteTuple.Where(sq.Eq{colNamespace: filter.ResourceType}) if filter.OptionalResourceId != "" { diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index d9ddf37fb4..5d1ebcb039 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -299,7 +299,7 @@ func (rwt *observableRWT) DeleteNamespaces(ctx context.Context, nsNames ...strin return rwt.delegate.DeleteNamespaces(ctx, nsNames...) } -func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (rwt *observableRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) { ctx, closer := observe(ctx, "DeleteRelationships", trace.WithAttributes( filterToAttributes(filter)..., )) diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 57890c0314..2f40526b2e 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -242,9 +242,9 @@ func (dm *MockReadWriteTransaction) WriteRelationships(_ context.Context, mutati return args.Error(0) } -func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter) error { +func (dm *MockReadWriteTransaction) DeleteRelationships(_ context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) { args := dm.Called(filter) - return args.Error(0) + return false, args.Error(0) } func (dm *MockReadWriteTransaction) WriteNamespaces(_ context.Context, newConfigs ...*core.NamespaceDefinition) error { diff --git a/internal/datastore/spanner/reader.go b/internal/datastore/spanner/reader.go index 8a34ae66e1..9ebb93a840 100644 --- a/internal/datastore/spanner/reader.go +++ b/internal/datastore/spanner/reader.go @@ -234,6 +234,15 @@ var queryTuples = sql.Select( colCaveatContext, ).From(tableRelationship) +var queryTuplesForDelete = sql.Select( + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, +).From(tableRelationship) + var schema = common.NewSchemaInformation( colNamespace, colObjectID, diff --git a/internal/datastore/spanner/readwrite.go b/internal/datastore/spanner/readwrite.go index c13e831fe3..ede8d341b7 100644 --- a/internal/datastore/spanner/readwrite.go +++ b/internal/datastore/spanner/readwrite.go @@ -12,7 +12,9 @@ import ( log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/spiceerrors" ) type spannerReadWriteTXN struct { @@ -21,6 +23,8 @@ type spannerReadWriteTXN struct { disableStats bool } +const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits + func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error { var rowCountChange int64 for _, mutation := range mutations { @@ -68,65 +72,140 @@ func spannerMutation( return } -func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { - if err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats); err != nil { - return fmt.Errorf(errUnableToDeleteRelationships, err) +func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { + limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...) + if err != nil { + return false, fmt.Errorf(errUnableToDeleteRelationships, err) } - return nil + return limitReached, nil +} + +func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) { + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + var delLimit uint64 + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + delLimit = *delOpts.DeleteLimit + if delLimit > inLimit { + return false, spiceerrors.MustBugf("delete limit %d exceeds maximum of %d in spanner", delLimit, inLimit) + } + } + + var numDeleted int64 + if delLimit > 0 { + nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit) + if err != nil { + return false, err + } + numDeleted = nu + } else { + nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats) + if err != nil { + return false, err + } + + numDeleted = nu + } + + if !disableStats { + if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil { + return false, err + } + } + + if delLimit > 0 && uint64(numDeleted) == delLimit { + return true, nil + } + + return false, nil } -type selectAndDelete struct { - sel sq.SelectBuilder - del sq.DeleteBuilder +func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) { + query := queryTuplesForDelete + query = applyFilterToQuery(query, filter) + query = query.Limit(delLimit) + + sql, args, err := query.ToSql() + if err != nil { + return -1, err + } + + mutations := make([]*spanner.Mutation, 0, delLimit) + + // Load the relationships to be deleted. + iter := rwt.Query(ctx, statementFromSQL(sql, args)) + defer iter.Stop() + + if err := iter.Do(func(row *spanner.Row) error { + nextTuple := &core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{}, + Subject: &core.ObjectAndRelation{}, + } + err := row.Columns( + &nextTuple.ResourceAndRelation.Namespace, + &nextTuple.ResourceAndRelation.ObjectId, + &nextTuple.ResourceAndRelation.Relation, + &nextTuple.Subject.Namespace, + &nextTuple.Subject.ObjectId, + &nextTuple.Subject.Relation, + ) + if err != nil { + return err + } + + mutations = append(mutations, spanner.Delete(tableRelationship, keyFromRelationship(nextTuple))) + return nil + }); err != nil { + return -1, err + } + + // Delete the relationships. + if err := rwt.BufferWrite(mutations); err != nil { + return -1, fmt.Errorf(errUnableToWriteRelationships, err) + } + + return int64(len(mutations)), nil } -func (snd selectAndDelete) Where(pred any, args ...any) selectAndDelete { - snd.sel = snd.sel.Where(pred, args...) - snd.del = snd.del.Where(pred, args...) - return snd +func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) { + query := sql.Delete(tableRelationship) + query = applyFilterToQuery(query, filter) + + sql, args, err := query.ToSql() + if err != nil { + return -1, err + } + + deleteStatement := statementFromSQL(sql, args) + return rwt.Update(ctx, deleteStatement) } -func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) error { - queries := selectAndDelete{queryTuples, sql.Delete(tableRelationship)} +type builder[T any] interface { + Where(pred interface{}, args ...interface{}) T +} +func applyFilterToQuery[T builder[T]](query T, filter *v1.RelationshipFilter) T { // Add clauses for the ResourceFilter - queries = queries.Where(sq.Eq{colNamespace: filter.ResourceType}) + query = query.Where(sq.Eq{colNamespace: filter.ResourceType}) if filter.OptionalResourceId != "" { - queries = queries.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) + query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) } if filter.OptionalRelation != "" { - queries = queries.Where(sq.Eq{colRelation: filter.OptionalRelation}) + query = query.Where(sq.Eq{colRelation: filter.OptionalRelation}) } // Add clauses for the SubjectFilter if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { - queries = queries.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) + query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) if subjectFilter.OptionalSubjectId != "" { - queries = queries.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) + query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) } if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil { - queries = queries.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) - } - } - - sql, args, err := queries.del.ToSql() - if err != nil { - return err - } - - numDeleted, err := rwt.Update(ctx, statementFromSQL(sql, args)) - if err != nil { - return err - } - - if !disableStats { - if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil { - return err + query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) } } - return nil + return query } func upsertVals(r *core.RelationTuple) []any { @@ -181,7 +260,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ... func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error { for _, nsName := range nsNames { relFilter := &v1.RelationshipFilter{ResourceType: nsName} - if err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil { + if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil { return fmt.Errorf(errUnableToDeleteConfig, err) } diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index f75f220f14..b76e5efd6d 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -28,7 +28,6 @@ import ( "github.com/authzed/spicedb/pkg/datastore/pagination" "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/middleware/consistency" - core "github.com/authzed/spicedb/pkg/proto/core/v1" dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/tuple" "github.com/authzed/spicedb/pkg/zedtoken" @@ -351,12 +350,19 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return err } - var deleteMutations []*core.RelationTupleUpdate + usagemetrics.SetInContext(ctx, &dispatchv1.ResponseMeta{ + // One request per precondition and one request for the actual delete. + DispatchCount: uint32(len(req.OptionalPreconditions)) + 1, + }) - if req.OptionalLimit > 0 { - limit := uint64(req.OptionalLimit) - deleteMutations = make([]*core.RelationTupleUpdate, 0, limit) + if err := checkPreconditions(ctx, rwt, req.OptionalPreconditions); err != nil { + return err + } + // If a limit was specified but partial deletion is not allowed, we need to check if the + // number of relationships to be deleted exceeds the limit. + if req.OptionalLimit > 0 && !req.OptionalAllowPartialDeletions { + limit := uint64(req.OptionalLimit) limitPlusOne := limit + 1 filter := datastore.RelationshipsFilterFromPublicFilter(req.RelationshipFilter) @@ -366,39 +372,39 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del } defer iter.Close() + counter := 0 for tpl := iter.Next(); tpl != nil; tpl = iter.Next() { if iter.Err() != nil { return ps.rewriteError(ctx, err) } - if len(deleteMutations) == int(limit) { - deletionProgress = v1.DeleteRelationshipsResponse_DELETION_PROGRESS_PARTIAL - if !req.OptionalAllowPartialDeletions { - return ps.rewriteError(ctx, NewCouldNotTransactionallyDeleteErr(req.RelationshipFilter, req.OptionalLimit)) - } - - break + if counter == int(limit) { + return ps.rewriteError(ctx, NewCouldNotTransactionallyDeleteErr(req.RelationshipFilter, req.OptionalLimit)) } - deleteMutations = append(deleteMutations, tuple.Delete(tpl)) + counter++ } iter.Close() } - usagemetrics.SetInContext(ctx, &dispatchv1.ResponseMeta{ - // One request per precondition and one request for the actual delete. - DispatchCount: uint32(len(req.OptionalPreconditions)) + 1, - }) + // Delete with the specified limit. + if req.OptionalLimit > 0 { + deleteLimit := uint64(req.OptionalLimit) + reachedLimit, err := rwt.DeleteRelationships(ctx, req.RelationshipFilter, options.WithDeleteLimit(&deleteLimit)) + if err != nil { + return err + } - if err := checkPreconditions(ctx, rwt, req.OptionalPreconditions); err != nil { - return err - } + if reachedLimit { + deletionProgress = v1.DeleteRelationshipsResponse_DELETION_PROGRESS_PARTIAL + } - if len(deleteMutations) > 0 { - return rwt.WriteRelationships(ctx, deleteMutations) + return nil } - return rwt.DeleteRelationships(ctx, req.RelationshipFilter) + // Otherwise, kick off an unlimited deletion. + _, err := rwt.DeleteRelationships(ctx, req.RelationshipFilter) + return err }) if err != nil { return nil, ps.rewriteError(ctx, err) diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 2683fdfbf4..e3993dc9e0 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -1122,10 +1122,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { }) require.NoError(err) - headRev, err = ds.HeadRevision(context.Background()) - require.NoError(err) - - afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + afterDelete := readOfType(require, "document", client, resp.DeletedAt) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { diff --git a/internal/testfixtures/validating.go b/internal/testfixtures/validating.go index b6b359f7ad..3c159f6cb5 100644 --- a/internal/testfixtures/validating.go +++ b/internal/testfixtures/validating.go @@ -205,12 +205,12 @@ func (vrwt validatingReadWriteTransaction) WriteRelationships(ctx context.Contex return vrwt.delegate.WriteRelationships(ctx, mutations) } -func (vrwt validatingReadWriteTransaction) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (vrwt validatingReadWriteTransaction) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) { if err := filter.Validate(); err != nil { - return err + return false, err } - return vrwt.delegate.DeleteRelationships(ctx, filter) + return vrwt.delegate.DeleteRelationships(ctx, filter, options...) } func (vrwt validatingReadWriteTransaction) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error { diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 86b206f0fc..722455a090 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -285,8 +285,12 @@ type ReadWriteTransaction interface { // WriteRelationships takes a list of tuple mutations and applies them to the datastore. WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error - // DeleteRelationships deletes all Relationships that match the provided filter. - DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error + // DeleteRelationships deletes relationships that match the provided filter, with + // the optional limit. If a limit is provided and reached, the method will return + // true as the first return value. Otherwise, the boolean can be ignored. + DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, + options ...options.DeleteOptionsOption, + ) (bool, error) // WriteNamespaces takes proto namespace definitions and persists them. WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error diff --git a/pkg/datastore/options/options.go b/pkg/datastore/options/options.go index a9fa355ca0..34daf2e58a 100644 --- a/pkg/datastore/options/options.go +++ b/pkg/datastore/options/options.go @@ -5,6 +5,7 @@ import ( ) //go:generate go run github.com/ecordell/optgen -output zz_generated.query_options.go . QueryOptions ReverseQueryOptions RWTOptions +//go:generate go run github.com/ecordell/optgen -output zz_generated.delete_options.go . DeleteOptions // SortOrder is an enum which represents the order in which the caller would like // the data returned. @@ -53,6 +54,12 @@ type RWTOptions struct { DisableRetries bool `debugmap:"visible"` } +// DeleteOptions are the options that can affect the results of a delete relationships +// operation. +type DeleteOptions struct { + DeleteLimit *uint64 `debugmap:"visible"` +} + var ( one = uint64(1) diff --git a/pkg/datastore/options/zz_generated.delete_options.go b/pkg/datastore/options/zz_generated.delete_options.go new file mode 100644 index 0000000000..839c9ade3b --- /dev/null +++ b/pkg/datastore/options/zz_generated.delete_options.go @@ -0,0 +1,65 @@ +// Code generated by github.com/ecordell/optgen. DO NOT EDIT. +package options + +import ( + defaults "github.com/creasty/defaults" + helpers "github.com/ecordell/optgen/helpers" +) + +type DeleteOptionsOption func(d *DeleteOptions) + +// NewDeleteOptionsWithOptions creates a new DeleteOptions with the passed in options set +func NewDeleteOptionsWithOptions(opts ...DeleteOptionsOption) *DeleteOptions { + d := &DeleteOptions{} + for _, o := range opts { + o(d) + } + return d +} + +// NewDeleteOptionsWithOptionsAndDefaults creates a new DeleteOptions with the passed in options set starting from the defaults +func NewDeleteOptionsWithOptionsAndDefaults(opts ...DeleteOptionsOption) *DeleteOptions { + d := &DeleteOptions{} + defaults.MustSet(d) + for _, o := range opts { + o(d) + } + return d +} + +// ToOption returns a new DeleteOptionsOption that sets the values from the passed in DeleteOptions +func (d *DeleteOptions) ToOption() DeleteOptionsOption { + return func(to *DeleteOptions) { + to.DeleteLimit = d.DeleteLimit + } +} + +// DebugMap returns a map form of DeleteOptions for debugging +func (d DeleteOptions) DebugMap() map[string]any { + debugMap := map[string]any{} + debugMap["DeleteLimit"] = helpers.DebugValue(d.DeleteLimit, false) + return debugMap +} + +// DeleteOptionsWithOptions configures an existing DeleteOptions with the passed in options set +func DeleteOptionsWithOptions(d *DeleteOptions, opts ...DeleteOptionsOption) *DeleteOptions { + for _, o := range opts { + o(d) + } + return d +} + +// WithOptions configures the receiver DeleteOptions with the passed in options set +func (d *DeleteOptions) WithOptions(opts ...DeleteOptionsOption) *DeleteOptions { + for _, o := range opts { + o(d) + } + return d +} + +// WithDeleteLimit returns an option that can set DeleteLimit on a DeleteOptions +func WithDeleteLimit(deleteLimit *uint64) DeleteOptionsOption { + return func(d *DeleteOptions) { + d.DeleteLimit = deleteLimit + } +} diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 56701cd81f..a1a79dd6ef 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -98,10 +98,13 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestWriteDeleteWrite", func(t *testing.T) { WriteDeleteWriteTest(t, tester) }) t.Run("TestCreateAlreadyExisting", func(t *testing.T) { CreateAlreadyExistingTest(t, tester) }) t.Run("TestTouchAlreadyExisting", func(t *testing.T) { TouchAlreadyExistingTest(t, tester) }) - t.Run("TestCreateDeleteTouchTest", func(t *testing.T) { CreateDeleteTouchTest(t, tester) }) - t.Run("TestCreateTouchDeleteTouchTest", func(t *testing.T) { CreateTouchDeleteTouchTest(t, tester) }) + t.Run("TestCreateDeleteTouch", func(t *testing.T) { CreateDeleteTouchTest(t, tester) }) + t.Run("TestDeleteOneThousandIndividualInOneCall", func(t *testing.T) { DeleteOneThousandIndividualInOneCallTest(t, tester) }) + t.Run("TestCreateTouchDeleteTouch", func(t *testing.T) { CreateTouchDeleteTouchTest(t, tester) }) t.Run("TestTouchAlreadyExistingCaveated", func(t *testing.T) { TouchAlreadyExistingCaveatedTest(t, tester) }) t.Run("TestBulkDeleteRelationships", func(t *testing.T) { BulkDeleteRelationshipsTest(t, tester) }) + t.Run("TestDeleteCaveatedTuple", func(t *testing.T) { DeleteCaveatedTupleTest(t, tester) }) + t.Run("TestDeleteWithLimit", func(t *testing.T) { DeleteWithLimitTest(t, tester) }) t.Run("TestMultipleReadsInRWT", func(t *testing.T) { MultipleReadsInRWTTest(t, tester) }) t.Run("TestConcurrentWriteSerialization", func(t *testing.T) { ConcurrentWriteSerializationTest(t, tester) }) diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index a43994036e..108685fff6 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -237,7 +237,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) { // Delete with DeleteRelationship deletedAt, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + _, err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ ResourceType: testResourceNamespace, }) require.NoError(err) @@ -400,7 +400,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { require.NoError(err) deletedAt, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - err := rwt.DeleteRelationships(ctx, tt.filter) + _, err := rwt.DeleteRelationships(ctx, tt.filter) require.NoError(err) return err }) @@ -636,6 +636,118 @@ func CreateDeleteTouchTest(t *testing.T, tester DatastoreTester) { ensureTuples(ctx, require, ds, tpl1, tpl2) } +// DeleteOneThousandIndividualInOneCallTest tests deleting 1000 relationships, individually. +func DeleteOneThousandIndividualInOneCallTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) + ctx := context.Background() + + // Write the 1000 relationships. + tuples := make([]*core.RelationTuple, 0, 1000) + for i := 0; i < 1000; i++ { + tpl := makeTestTuple("foo", fmt.Sprintf("user%d", i)) + tuples = append(tuples, tpl) + } + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tuples...) + require.NoError(err) + ensureTuples(ctx, require, ds, tuples...) + + // Add an extra tuple. + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, makeTestTuple("foo", "extra")) + require.NoError(err) + ensureTuples(ctx, require, ds, makeTestTuple("foo", "extra")) + + // Delete the first 1000 tuples. + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, tuples...) + require.NoError(err) + ensureNotTuples(ctx, require, ds, tuples...) + + // Ensure the extra tuple is still present. + ensureTuples(ctx, require, ds, makeTestTuple("foo", "extra")) +} + +// DeleteWithLimitTest tests deleting relationships with a limit. +func DeleteWithLimitTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) + ctx := context.Background() + + // Write the 1000 relationships. + tuples := make([]*core.RelationTuple, 0, 1000) + for i := 0; i < 1000; i++ { + tpl := makeTestTuple("foo", fmt.Sprintf("user%d", i)) + tuples = append(tuples, tpl) + } + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tuples...) + require.NoError(err) + ensureTuples(ctx, require, ds, tuples...) + + // Delete 100 tuples. + var deleteLimit uint64 = 100 + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + limitReached, err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + ResourceType: testResourceNamespace, + }, options.WithDeleteLimit(&deleteLimit)) + require.NoError(err) + require.True(limitReached) + return nil + }) + require.NoError(err) + + // Ensure 900 tuples remain. + found := countTuples(ctx, require, ds, testResourceNamespace) + require.Equal(900, found) + + // Delete the remainder. + deleteLimit = 1000 + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + limitReached, err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + ResourceType: testResourceNamespace, + }, options.WithDeleteLimit(&deleteLimit)) + require.NoError(err) + require.False(limitReached) + return nil + }) + require.NoError(err) + + found = countTuples(ctx, require, ds, testResourceNamespace) + require.Equal(0, found) +} + +// DeleteCaveatedTupleTest tests deleting a relationship with a caveat. +func DeleteCaveatedTupleTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) + ctx := context.Background() + + tpl := tuple.Parse("test/resource:someresource#viewer@test/user:someuser[somecaveat]") + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_CREATE, tpl) + require.NoError(err) + ensureTuples(ctx, require, ds, tpl) + + // Delete the tuple. + withoutCaveat := tuple.Parse("test/resource:someresource#viewer@test/user:someuser") + + _, err = common.WriteTuples(ctx, ds, core.RelationTupleUpdate_DELETE, withoutCaveat) + require.NoError(err) + ensureNotTuples(ctx, require, ds, tpl, withoutCaveat) +} + // CreateTouchDeleteTouchTest tests writing a relationship, touching it, deleting it, and then touching it. func CreateTouchDeleteTouchTest(t *testing.T, tester DatastoreTester) { require := require.New(t) @@ -814,10 +926,11 @@ func BulkDeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { deletedRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { t.Log(time.Now(), "deleting") deleteCount++ - return rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + _, err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ ResourceType: testResourceNamespace, OptionalRelation: testReaderRelation, }) + return err }) require.NoError(err) require.Equal(1, deleteCount) @@ -888,3 +1001,28 @@ func ensureTuplesStatus(ctx context.Context, require *require.Assertions, ds dat } } } + +func countTuples(ctx context.Context, require *require.Assertions, ds datastore.Datastore, resourceType string) int { + headRev, err := ds.HeadRevision(ctx) + require.NoError(err) + + reader := ds.SnapshotReader(headRev) + + iter, err := reader.QueryRelationships(ctx, datastore.RelationshipsFilter{ + ResourceType: resourceType, + }) + require.NoError(err) + defer iter.Close() + + counter := 0 + for { + rel := iter.Next() + if rel == nil { + break + } + + counter++ + } + + return counter +} diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 93efd9af6f..0191796bbc 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -108,7 +108,7 @@ func WatchTest(t *testing.T, tester DatastoreTester) { testUpdates = append(testUpdates, batch, []*core.RelationTupleUpdate{deleteUpdate}) _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { - err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ + _, err := rwt.DeleteRelationships(ctx, &v1.RelationshipFilter{ ResourceType: testResourceNamespace, OptionalRelation: testReaderRelation, OptionalSubjectFilter: &v1.SubjectFilter{