Skip to content

Commit

Permalink
Copy-paste to simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
tstirrat15 committed Sep 17, 2024
1 parent 618a77e commit 43189cb
Show file tree
Hide file tree
Showing 5 changed files with 732 additions and 177 deletions.
2 changes: 2 additions & 0 deletions internal/services/v1/bulkcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type bulkChecker struct {
dispatchChunkSize uint16
}

const maxBulkCheckCount = 10000

func (bc *bulkChecker) checkBulkPermissions(ctx context.Context, req *v1.CheckBulkPermissionsRequest) (*v1.CheckBulkPermissionsResponse, error) {
atRevision, checkedAt, err := consistency.RevisionFromContext(ctx)
if err != nil {
Expand Down
247 changes: 80 additions & 167 deletions internal/services/v1/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1
import (
"context"
"errors"
"io"
"slices"
"sort"
"strings"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/authzed/spicedb/internal/middleware/handwrittenvalidation"
"github.com/authzed/spicedb/internal/middleware/streamtimeout"
"github.com/authzed/spicedb/internal/middleware/usagemetrics"
"github.com/authzed/spicedb/internal/relationships"
"github.com/authzed/spicedb/internal/services/shared"
"github.com/authzed/spicedb/internal/services/v1/options"
"github.com/authzed/spicedb/pkg/cursor"
Expand Down Expand Up @@ -100,27 +102,83 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis
streamtimeout.MustStreamServerInterceptor(config.StreamReadTimeout),
),
},
maxBatchSize: uint64(config.MaxExportBatchSize),
bulkChecker: &bulkChecker{
maxAPIDepth: permServerConfig.MaximumAPIDepth,
maxCaveatContextSize: permServerConfig.MaxCaveatContextSize,
maxConcurrency: config.BulkCheckMaxConcurrency,
dispatch: dispatch,
dispatchChunkSize: chunkSize,
},
bulkImporter: &bulkImporter{},
bulkExporter: &bulkExporter{
maxBatchSize: uint64(config.MaxExportBatchSize),
},
}
}

type experimentalServer struct {
v1.UnimplementedExperimentalServiceServer
shared.WithServiceSpecificInterceptors

maxBatchSize uint64

bulkChecker *bulkChecker
bulkImporter *bulkImporter
bulkExporter *bulkExporter
}

type bulkLoadAdapter struct {
stream v1.ExperimentalService_BulkImportRelationshipsServer
referencedNamespaceMap map[string]*typesystem.TypeSystem
referencedCaveatMap map[string]*core.CaveatDefinition
current core.RelationTuple
caveat core.ContextualizedCaveat

awaitingNamespaces []string
awaitingCaveats []string

currentBatch []*v1.Relationship
numSent int
err error
}

func (a *bulkLoadAdapter) Next(_ context.Context) (*core.RelationTuple, error) {
for a.err == nil && a.numSent == len(a.currentBatch) {
// Load a new batch
batch, err := a.stream.Recv()
if err != nil {
a.err = err
if errors.Is(a.err, io.EOF) {
return nil, nil
}
return nil, a.err
}

a.currentBatch = batch.Relationships
a.numSent = 0

a.awaitingNamespaces, a.awaitingCaveats = extractBatchNewReferencedNamespacesAndCaveats(
a.currentBatch,
a.referencedNamespaceMap,
a.referencedCaveatMap,
)
}

if len(a.awaitingNamespaces) > 0 || len(a.awaitingCaveats) > 0 {
// Shut down the stream to give our caller a chance to fill in this information
return nil, nil
}

a.current.Caveat = &a.caveat
a.current.Integrity = nil
tuple.CopyRelationshipToRelationTuple(a.currentBatch[a.numSent], &a.current)

if err := relationships.ValidateOneRelationship(
a.referencedNamespaceMap,
a.referencedCaveatMap,
&a.current,
relationships.ValidateRelationshipForCreateOrTouch,
); err != nil {
return nil, err
}

a.numSent++
return &a.current, nil
}

func extractBatchNewReferencedNamespacesAndCaveats(
Expand All @@ -147,165 +205,8 @@ func extractBatchNewReferencedNamespacesAndCaveats(
return lo.Keys(newNamespaces), lo.Keys(newCaveats)
}

func (es *experimentalServer) BulkImportRelationships(stream grpc.ClientStreamingServer[v1.BulkImportRelationshipsRequest, v1.BulkImportRelationshipsResponse]) error {
}

func (es *experimentalServer) BulkExportRelationships(
req *v1.BulkExportRelationshipsRequest,
resp grpc.ServerStreamingServer[v1.BulkExportRelationshipsResponse],
) error {
ctx := resp.Context()
atRevision, _, err := consistency.RevisionFromContext(ctx)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

return BulkExport(ctx, datastoremw.MustFromContext(ctx), es.bulkExporter.maxBatchSize, req, atRevision, resp.Send)
}

// BulkExport implements the BulkExportRelationships API functionality. Given a datastore.Datastore, it will
// export stream via the sender all relationships matched by the incoming request.
// If no cursor is provided, it will fallback to the provided revision.
func BulkExport(ctx context.Context, ds datastore.Datastore, batchSize uint64, req *v1.ExportBulkRelationshipsRequest, fallbackRevision datastore.Revision, sender func(response *v1.ExportBulkRelationshipsResponse) error) error {
if req.OptionalLimit > 0 && uint64(req.OptionalLimit) > batchSize {
return shared.RewriteErrorWithoutConfig(ctx, NewExceedsMaximumLimitErr(uint64(req.OptionalLimit), batchSize))
}

atRevision := fallbackRevision
var curNamespace string
var cur dsoptions.Cursor
if req.OptionalCursor != nil {
var err error
atRevision, curNamespace, cur, err = decodeCursor(ds, req.OptionalCursor)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}
}

reader := ds.SnapshotReader(atRevision)

namespaces, err := reader.ListAllNamespaces(ctx)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

// Make sure the namespaces are always in a stable order
slices.SortFunc(namespaces, func(
lhs datastore.RevisionedDefinition[*core.NamespaceDefinition],
rhs datastore.RevisionedDefinition[*core.NamespaceDefinition],
) int {
return strings.Compare(lhs.Definition.Name, rhs.Definition.Name)
})

// Skip the namespaces that are already fully returned
for cur != nil && len(namespaces) > 0 && namespaces[0].Definition.Name < curNamespace {
namespaces = namespaces[1:]
}

limit := batchSize
if req.OptionalLimit > 0 {
limit = uint64(req.OptionalLimit)
}

// Pre-allocate all of the relationships that we might need in order to
// make export easier and faster for the garbage collector.
relsArray := make([]v1.Relationship, limit)
objArray := make([]v1.ObjectReference, limit)
subArray := make([]v1.SubjectReference, limit)
subObjArray := make([]v1.ObjectReference, limit)
caveatArray := make([]v1.ContextualizedCaveat, limit)
for i := range relsArray {
relsArray[i].Resource = &objArray[i]
relsArray[i].Subject = &subArray[i]
relsArray[i].Subject.Object = &subObjArray[i]
}

emptyRels := make([]*v1.Relationship, limit)
for _, ns := range namespaces {
rels := emptyRels

// Reset the cursor between namespaces.
if ns.Definition.Name != curNamespace {
cur = nil
}

// Skip this namespace if a resource type filter was specified.
if req.OptionalRelationshipFilter != nil && req.OptionalRelationshipFilter.ResourceType != "" {
if ns.Definition.Name != req.OptionalRelationshipFilter.ResourceType {
continue
}
}

// Setup the filter to use for the relationships.
relationshipFilter := datastore.RelationshipsFilter{OptionalResourceType: ns.Definition.Name}
if req.OptionalRelationshipFilter != nil {
rf, err := datastore.RelationshipsFilterFromPublicFilter(req.OptionalRelationshipFilter)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

// Overload the namespace name with the one from the request, because each iteration is for a different namespace.
rf.OptionalResourceType = ns.Definition.Name
relationshipFilter = rf
}

// We want to keep iterating as long as we're sending full batches.
// To bootstrap this loop, we enter the first time with a full rels
// slice of dummy rels that were never sent.
for uint64(len(rels)) == limit {
// Lop off any rels we've already sent
rels = rels[:0]

tplFn := func(tpl *core.RelationTuple) {
offset := len(rels)
rels = append(rels, &relsArray[offset]) // nozero
tuple.CopyRelationTupleToRelationship(tpl, &relsArray[offset], &caveatArray[offset])
}

cur, err = queryForEach(
ctx,
reader,
relationshipFilter,
tplFn,
dsoptions.WithLimit(&limit),
dsoptions.WithAfter(cur),
dsoptions.WithSort(dsoptions.ByResource),
)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

if len(rels) == 0 {
continue
}

encoded, err := cursor.Encode(&implv1.DecodedCursor{
VersionOneof: &implv1.DecodedCursor_V1{
V1: &implv1.V1Cursor{
Revision: atRevision.String(),
Sections: []string{
ns.Definition.Name,
tuple.MustString(cur),
},
},
},
})
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

if err := sender(&v1.BulkExportRelationshipsResponse{
AfterResultCursor: encoded,
Relationships: rels,
}); err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}
}
}
return nil
}

func (es *experimentalServer) ImportBulkRelationships(stream v1.PermissionsService_ImportBulkRelationshipsServer) error {
// TODO: this is now duplicate code with ImportBulkRelationships
func (es *experimentalServer) BulkImportRelationships(stream v1.ExperimentalService_BulkImportRelationshipsServer) error {
ds := datastoremw.MustFromContext(stream.Context())

var numWritten uint64
Expand Down Expand Up @@ -377,6 +278,20 @@ func (es *experimentalServer) ImportBulkRelationships(stream v1.PermissionsServi
})
}

// TODO: this is now duplicate code with ExportBulkRelationships
func (es *experimentalServer) BulkExportRelationships(
req *v1.BulkExportRelationshipsRequest,
resp grpc.ServerStreamingServer[v1.BulkExportRelationshipsResponse],
) error {
ctx := resp.Context()
atRevision, _, err := consistency.RevisionFromContext(ctx)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

return BulkExport(ctx, datastoremw.MustFromContext(ctx), es.maxBatchSize, req, atRevision, resp.Send)
}

// BulkExport implements the BulkExportRelationships API functionality. Given a datastore.Datastore, it will
// export stream via the sender all relationships matched by the incoming request.
// If no cursor is provided, it will fallback to the provided revision.
Expand Down Expand Up @@ -519,8 +434,6 @@ func BulkExport(ctx context.Context, ds datastore.Datastore, batchSize uint64, r
return nil
}

const maxBulkCheckCount = 10000

func (es *experimentalServer) BulkCheckPermission(ctx context.Context, req *v1.BulkCheckPermissionRequest) (*v1.BulkCheckPermissionResponse, error) {
convertedReq := toCheckBulkPermissionsRequest(req)
res, err := es.bulkChecker.checkBulkPermissions(ctx, convertedReq)
Expand Down
Loading

0 comments on commit 43189cb

Please sign in to comment.