diff --git a/internal/graph/lookupresources2.go b/internal/graph/lookupresources2.go index 2e85e89a97..fff4da2b4a 100644 --- a/internal/graph/lookupresources2.go +++ b/internal/graph/lookupresources2.go @@ -2,7 +2,6 @@ package graph import ( "context" - "errors" "slices" "sort" @@ -439,6 +438,11 @@ func hintString(resourceID string, entrypoint typesystem.ReachabilityEntrypoint, return typesystem.CheckHint(resourceKey, terminalSubject), nil } +type possibleResourceAndIndex struct { + resource *v1.PossibleResource + index int +} + // redispatchOrReport checks if further redispatching is necessary for the found resource // type. If not, and the found resource type+relation matches the target resource type+relation, // the resource is reported to the parent stream. @@ -483,7 +487,14 @@ func (crr *CursoredLookupResources2) redispatchOrReport( return nil } - filtered := offsetted + filtered := make([]possibleResourceAndIndex, 0, len(offsetted)) + for index, resource := range offsetted { + filtered = append(filtered, possibleResourceAndIndex{ + resource: resource, + index: index, + }) + } + metadata := emptyMetadata // If the entrypoint is not a direct result, issue a check to further filter the results on the intersection or exclusion. @@ -517,8 +528,8 @@ func (crr *CursoredLookupResources2) redispatchOrReport( metadata = addCallToResponseMetadata(checkMetadata) - filtered = make([]*v1.PossibleResource, 0, len(offsetted)) - for _, resource := range offsetted { + filtered = make([]possibleResourceAndIndex, 0, len(offsetted)) + for index, resource := range offsetted { result, ok := resultsByResourceID[resource.ResourceId] if !ok { continue @@ -526,16 +537,22 @@ func (crr *CursoredLookupResources2) redispatchOrReport( switch result.Membership { case v1.ResourceCheckResult_MEMBER: - filtered = append(filtered, resource) + filtered = append(filtered, possibleResourceAndIndex{ + resource: resource, + index: index, + }) case v1.ResourceCheckResult_CAVEATED_MEMBER: missingContextParams := mapz.NewSet(result.MissingExprFields...) missingContextParams.Extend(resource.MissingContextParams) - filtered = append(filtered, &v1.PossibleResource{ - ResourceId: resource.ResourceId, - ForSubjectIds: resource.ForSubjectIds, - MissingContextParams: missingContextParams.AsSlice(), + filtered = append(filtered, possibleResourceAndIndex{ + resource: &v1.PossibleResource{ + ResourceId: resource.ResourceId, + ForSubjectIds: resource.ForSubjectIds, + MissingContextParams: missingContextParams.AsSlice(), + }, + index: index, }) case v1.ResourceCheckResult_NOT_MEMBER: @@ -547,15 +564,15 @@ func (crr *CursoredLookupResources2) redispatchOrReport( } } - for index, resource := range filtered { + for _, resourceAndIndex := range filtered { if !ci.limits.prepareForPublishing() { return nil } err := parentStream.Publish(&v1.DispatchLookupResources2Response{ - Resource: resource, + Resource: resourceAndIndex.resource, Metadata: metadata, - AfterResponseCursor: nextCursorWith(currentOffset + index + 1), + AfterResponseCursor: nextCursorWith(currentOffset + resourceAndIndex.index + 1), }) if err != nil { return err @@ -581,28 +598,11 @@ func (crr *CursoredLookupResources2) redispatchOrReport( return nil } - // The stream that collects the results of the dispatch will add metadata to the response, - // map the results found based on the mapping data in the results and, if the entrypoint is not - // direct, issue a check to further filter the results. - currentCursor := ci.currentCursor - - // Loop until we've produced enough results to satisfy the limit. This is necessary because - // the dispatch may return a set of results that, after checking, is less than the limit. - for { - stream, completed := lookupResourcesDispatchStreamForEntrypoint(ctx, foundResources, parentStream, entrypoint, ci, parentRequest, crr.dc) - - // NOTE: if the entrypoint is a direct result, then all results returned by the dispatch will, themselves, - // be direct results. In this case, we can request the full limit of results. If the entrypoint is not a - // direct result, then we must request more than the limit in the hope that we get enough results to satisfy the - // limit after filtering. - var limit uint32 = uint32(datastore.FilterMaximumIDCount) - if entrypoint.IsDirectResult() { - limit = parentRequest.OptionalLimit - } - - // Dispatch the found resources as the subjects for the next call, to continue the - // resolution. - err = crr.dl.DispatchLookupResources2(&v1.DispatchLookupResources2Request{ + // If the entrypoint is a direct result then we can simply dispatch directly and map + // all found results, as no further filtering will be needed. + if entrypoint.IsDirectResult() { + stream := unfilteredLookupResourcesDispatchStreamForEntrypoint(ctx, foundResources, parentStream, ci) + return crr.dl.DispatchLookupResources2(&v1.DispatchLookupResources2Request{ ResourceRelation: parentRequest.ResourceRelation, SubjectRelation: newSubjectType, SubjectIds: filteredSubjectIDs, @@ -611,198 +611,24 @@ func (crr *CursoredLookupResources2) redispatchOrReport( AtRevision: parentRequest.Revision.String(), DepthRemaining: parentRequest.Metadata.DepthRemaining - 1, }, - OptionalCursor: currentCursor, - OptionalLimit: limit, // Request more than the limit to hopefully get enough results. + OptionalCursor: ci.currentCursor, + OptionalLimit: parentRequest.OptionalLimit, }, stream) - if err != nil { - // If the dispatch was canceled due to the limit, do not treat it as an error. - if errors.Is(err, errCanceledBecauseLimitReached) { - return err - } - } - - nextCursor, err := completed() - if err != nil { - return err - } - - if nextCursor == nil || ci.limits.hasExhaustedLimit() { - break - } - currentCursor = nextCursor } - return nil + // Otherwise, we need to dispatch and filter results by batch checking along the way. + return runDispatchAndChecker( + ctx, + parentRequest, + foundResources, + ci, + parentStream, + newSubjectType, + filteredSubjectIDs, + entrypoint, + crr.dl, + crr.dc, + crr.concurrencyLimit, + ) }) } - -func lookupResourcesDispatchStreamForEntrypoint( - ctx context.Context, - foundResources dispatchableResourcesSubjectMap2, - parentStream dispatch.LookupResources2Stream, - entrypoint typesystem.ReachabilityEntrypoint, - ci cursorInformation, - parentRequest ValidatedLookupResources2Request, - dc dispatch.Check, -) (dispatch.LookupResources2Stream, func() (*v1.Cursor, error)) { - // Branch the context so that the dispatch can be canceled without canceling the parent - // call. - sctx, cancelDispatch := branchContext(ctx) - - needsCallAddedToMetadata := true - resultsToCheck := make([]*v1.DispatchLookupResources2Response, 0, int(datastore.FilterMaximumIDCount)) - var nextCursor *v1.Cursor - - publishResultToParentStream := func( - result *v1.DispatchLookupResources2Response, - additionalMissingContext []string, - additionalMetadata *v1.ResponseMeta, - ) error { - // Map the found resources via the subject+resources used for dispatching, to determine - // if any need to be made conditional due to caveats. - mappedResource, err := foundResources.mapPossibleResource(result.Resource) - if err != nil { - return err - } - - if !ci.limits.prepareForPublishing() { - cancelDispatch(errCanceledBecauseLimitReached) - return nil - } - - // The cursor for the response is that of the parent response + the cursor from the result itself. - afterResponseCursor, err := combineCursors( - ci.responsePartialCursor(), - result.AfterResponseCursor, - ) - if err != nil { - return err - } - - metadata := combineResponseMetadata(result.Metadata, additionalMetadata) - - // Only the first dispatched result gets the call added to it. This is to prevent overcounting - // of the batched dispatch. - if needsCallAddedToMetadata { - metadata = addCallToResponseMetadata(metadata) - needsCallAddedToMetadata = false - } else { - metadata = addAdditionalDepthRequired(metadata) - } - - missingContextParameters := mapz.NewSet(mappedResource.MissingContextParams...) - missingContextParameters.Extend(result.Resource.MissingContextParams) - missingContextParameters.Extend(additionalMissingContext) - - mappedResource.MissingContextParams = missingContextParameters.AsSlice() - - resp := &v1.DispatchLookupResources2Response{ - Resource: mappedResource, - Metadata: metadata, - AfterResponseCursor: afterResponseCursor, - } - - return parentStream.Publish(resp) - } - - batchCheckAndPublishIfNecessary := func(result *v1.DispatchLookupResources2Response) error { - // Add the result to the list of results to check. If nil, this is the final call to check+publish. - if result != nil { - resultsToCheck = append(resultsToCheck, result) - } - - // If we have not yet reached the maximum number of results to check and this is not the final - // call, return early. - if len(resultsToCheck) < int(datastore.FilterMaximumIDCount) && result != nil { - return nil - } - - // Ensure there are items left to check. - if len(resultsToCheck) == 0 { - return nil - } - - // Build the set of resource IDs to check and the hints to short circuit the check on the current entrypoint. - checkHints := make(map[string]*v1.ResourceCheckResult, len(resultsToCheck)) - resourceIDsToCheck := make([]string, 0, len(resultsToCheck)) - for _, resource := range resultsToCheck { - hintKey, err := hintString(resource.Resource.ResourceId, entrypoint, parentRequest.TerminalSubject) - if err != nil { - return err - } - - resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId) - - checkHints[hintKey] = &v1.ResourceCheckResult{ - Membership: v1.ResourceCheckResult_MEMBER, - } - } - - // Batch check the results to filter to those visible and then publish just the visible resources. - resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, dc, computed.CheckParameters{ - ResourceType: parentRequest.ResourceRelation, - Subject: parentRequest.TerminalSubject, - CaveatContext: parentRequest.Context.AsMap(), - AtRevision: parentRequest.Revision, - MaximumDepth: parentRequest.Metadata.DepthRemaining - 1, - DebugOption: computed.NoDebugging, - CheckHints: checkHints, - }, resourceIDsToCheck) - if err != nil { - return err - } - - metadata := checkMetadata - for _, resource := range resultsToCheck { - result, ok := resultsByResourceID[resource.Resource.ResourceId] - if !ok { - continue - } - - switch result.Membership { - case v1.ResourceCheckResult_MEMBER: - fallthrough - - case v1.ResourceCheckResult_CAVEATED_MEMBER: - if err := publishResultToParentStream(resource, result.MissingExprFields, metadata); err != nil { - return err - } - metadata = emptyMetadata - - case v1.ResourceCheckResult_NOT_MEMBER: - // Skip. - continue - - default: - return spiceerrors.MustBugf("unexpected result from check: %v", result.Membership) - } - } - - resultsToCheck = make([]*v1.DispatchLookupResources2Response, 0, int(datastore.FilterMaximumIDCount)) - return nil - } - - wrappedStream := dispatch.NewHandlingDispatchStream(sctx, func(result *v1.DispatchLookupResources2Response) error { - select { - case <-ctx.Done(): - return ctx.Err() - - default: - } - - nextCursor = result.AfterResponseCursor - - // If the entrypoint is a direct result, simply publish the found resource. - if entrypoint.IsDirectResult() { - return publishResultToParentStream(result, nil, emptyMetadata) - } - - // Otherwise, queue the result for checking and publishing if the check succeeds. - return batchCheckAndPublishIfNecessary(result) - }) - - return wrappedStream, func() (*v1.Cursor, error) { - defer cancelDispatch(nil) - return nextCursor, batchCheckAndPublishIfNecessary(nil) - } -} diff --git a/internal/graph/lr2streams.go b/internal/graph/lr2streams.go new file mode 100644 index 0000000000..65455f4004 --- /dev/null +++ b/internal/graph/lr2streams.go @@ -0,0 +1,286 @@ +package graph + +import ( + "context" + "sync" + + "github.com/authzed/spicedb/internal/dispatch" + "github.com/authzed/spicedb/internal/graph/computed" + "github.com/authzed/spicedb/internal/taskrunner" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/genutil/mapz" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" + "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/typesystem" +) + +// runDispatchAndChecker runs the dispatch and checker for a lookup resources call, and publishes +// the results to the parent stream. This function is responsible for handling the dispatching +// of the lookup resources call, and then checking the results to filter them. +func runDispatchAndChecker( + ctx context.Context, + parentReq ValidatedLookupResources2Request, + foundResources dispatchableResourcesSubjectMap2, + ci cursorInformation, + parentStream dispatch.LookupResources2Stream, + newSubjectType *core.RelationReference, + filteredSubjectIDs []string, + entrypoint typesystem.ReachabilityEntrypoint, + lrDispatcher dispatch.LookupResources2, + checkDispatcher dispatch.Check, + concurrencyLimit uint16, +) error { + // Only allow max one dispatcher and one checker to run concurrently. + concurrencyLimit = min(concurrencyLimit, 2) + + rdc := &rdc{ + parentRequest: parentReq, + foundResources: foundResources, + ci: ci, + parentStream: parentStream, + newSubjectType: newSubjectType, + filteredSubjectIDs: filteredSubjectIDs, + entrypoint: entrypoint, + lrDispatcher: lrDispatcher, + checkDispatcher: checkDispatcher, + taskrunner: taskrunner.NewTaskRunner(ctx, concurrencyLimit), + lock: &sync.Mutex{}, + } + + return rdc.runAndWait() +} + +type rdc struct { + parentRequest ValidatedLookupResources2Request + foundResources dispatchableResourcesSubjectMap2 + ci cursorInformation + parentStream dispatch.LookupResources2Stream + newSubjectType *core.RelationReference + filteredSubjectIDs []string + entrypoint typesystem.ReachabilityEntrypoint + lrDispatcher dispatch.LookupResources2 + checkDispatcher dispatch.Check + + taskrunner *taskrunner.TaskRunner + + lock *sync.Mutex +} + +func (rdc *rdc) dispatchAndCollect(ctx context.Context, cursor *v1.Cursor) ([]*v1.DispatchLookupResources2Response, error) { + collectingStream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx) + err := rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{ + ResourceRelation: rdc.parentRequest.ResourceRelation, + SubjectRelation: rdc.newSubjectType, + SubjectIds: rdc.filteredSubjectIDs, + TerminalSubject: rdc.parentRequest.TerminalSubject, + Metadata: &v1.ResolverMeta{ + AtRevision: rdc.parentRequest.Revision.String(), + DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1, + }, + OptionalCursor: cursor, + OptionalLimit: uint32(datastore.FilterMaximumIDCount), + }, collectingStream) + return collectingStream.Results(), err +} + +func (rdc *rdc) runDispatch(ctx context.Context, cursor *v1.Cursor) error { + rdc.lock.Lock() + if rdc.ci.limits.hasExhaustedLimit() { + rdc.lock.Unlock() + return nil + } + rdc.lock.Unlock() + + collected, err := rdc.dispatchAndCollect(ctx, cursor) + if err != nil { + return err + } + + if len(collected) == 0 { + return nil + } + + // Kick off a worker to filter the results via a check and then publish what was found. + rdc.taskrunner.Schedule(func(ctx context.Context) error { + return rdc.runChecker(ctx, collected) + }) + + // Start another dispatch at the cursor of the last response, to run in the background + // and collect more results for filtering while the checker is running. + rdc.taskrunner.Schedule(func(ctx context.Context) error { + return rdc.runDispatch(ctx, collected[len(collected)-1].AfterResponseCursor) + }) + + return nil +} + +func (rdc *rdc) runChecker(ctx context.Context, collected []*v1.DispatchLookupResources2Response) error { + rdc.lock.Lock() + if rdc.ci.limits.hasExhaustedLimit() { + rdc.lock.Unlock() + return nil + } + rdc.lock.Unlock() + + checkHints := make(map[string]*v1.ResourceCheckResult, len(collected)) + resourceIDsToCheck := make([]string, 0, len(collected)) + for _, resource := range collected { + hintKey, err := hintString(resource.Resource.ResourceId, rdc.entrypoint, rdc.parentRequest.TerminalSubject) + if err != nil { + return err + } + + resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId) + + checkHints[hintKey] = &v1.ResourceCheckResult{ + Membership: v1.ResourceCheckResult_MEMBER, + } + } + + // Batch check the results to filter to those visible and then publish just the visible resources. + resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, rdc.checkDispatcher, computed.CheckParameters{ + ResourceType: rdc.parentRequest.ResourceRelation, + Subject: rdc.parentRequest.TerminalSubject, + CaveatContext: rdc.parentRequest.Context.AsMap(), + AtRevision: rdc.parentRequest.Revision, + MaximumDepth: rdc.parentRequest.Metadata.DepthRemaining - 1, + DebugOption: computed.NoDebugging, + CheckHints: checkHints, + }, resourceIDsToCheck) + if err != nil { + return err + } + + // Publish any resources that are visible. + isFirstPublishCall := true + for _, resource := range collected { + result, ok := resultsByResourceID[resource.Resource.ResourceId] + if !ok { + continue + } + + switch result.Membership { + case v1.ResourceCheckResult_MEMBER: + fallthrough + + case v1.ResourceCheckResult_CAVEATED_MEMBER: + rdc.lock.Lock() + if err := publishResultToParentStream(resource, rdc.ci, rdc.foundResources, result.MissingExprFields, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil { + rdc.lock.Unlock() + return err + } + + isFirstPublishCall = false + + if rdc.ci.limits.hasExhaustedLimit() { + rdc.lock.Unlock() + return nil + } + rdc.lock.Unlock() + + case v1.ResourceCheckResult_NOT_MEMBER: + // Skip. + continue + + default: + return spiceerrors.MustBugf("unexpected result from check: %v", result.Membership) + } + } + + return nil +} + +func (rdc *rdc) runAndWait() error { + currentCursor := rdc.ci.currentCursor + + // Kick off a dispatch at the current cursor. + rdc.taskrunner.Schedule(func(ctx context.Context) error { + return rdc.runDispatch(ctx, currentCursor) + }) + + return rdc.taskrunner.Wait() +} + +// unfilteredLookupResourcesDispatchStreamForEntrypoint creates a new dispatch stream that wraps +// the parent stream, and publishes the results of the lookup resources call to the parent stream, +// mapped via foundResources. +func unfilteredLookupResourcesDispatchStreamForEntrypoint( + ctx context.Context, + foundResources dispatchableResourcesSubjectMap2, + parentStream dispatch.LookupResources2Stream, + ci cursorInformation, +) dispatch.LookupResources2Stream { + isFirstPublishCall := true + + wrappedStream := dispatch.NewHandlingDispatchStream(ctx, func(result *v1.DispatchLookupResources2Response) error { + select { + case <-ctx.Done(): + return ctx.Err() + + default: + } + + if err := publishResultToParentStream(result, ci, foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil { + return err + } + isFirstPublishCall = false + return nil + }) + + return wrappedStream +} + +// publishResultToParentStream publishes the result of a lookup resources call to the parent stream, +// mapped via foundResources. +func publishResultToParentStream( + result *v1.DispatchLookupResources2Response, + ci cursorInformation, + foundResources dispatchableResourcesSubjectMap2, + additionalMissingContext []string, + isFirstPublishCall bool, + additionalMetadata *v1.ResponseMeta, + parentStream dispatch.LookupResources2Stream, +) error { + // Map the found resources via the subject+resources used for dispatching, to determine + // if any need to be made conditional due to caveats. + mappedResource, err := foundResources.mapPossibleResource(result.Resource) + if err != nil { + return err + } + + if !ci.limits.prepareForPublishing() { + return nil + } + + // The cursor for the response is that of the parent response + the cursor from the result itself. + afterResponseCursor, err := combineCursors( + ci.responsePartialCursor(), + result.AfterResponseCursor, + ) + if err != nil { + return err + } + + metadata := result.Metadata + if isFirstPublishCall { + metadata = addCallToResponseMetadata(metadata) + metadata = combineResponseMetadata(metadata, additionalMetadata) + } else { + metadata = addAdditionalDepthRequired(metadata) + } + + missingContextParameters := mapz.NewSet(mappedResource.MissingContextParams...) + missingContextParameters.Extend(result.Resource.MissingContextParams) + missingContextParameters.Extend(additionalMissingContext) + + mappedResource.MissingContextParams = missingContextParameters.AsSlice() + + resp := &v1.DispatchLookupResources2Response{ + Resource: mappedResource, + Metadata: metadata, + AfterResponseCursor: afterResponseCursor, + } + + return parentStream.Publish(resp) +} diff --git a/internal/services/steelthreadtesting/operations.go b/internal/services/steelthreadtesting/operations.go index 41c9c833dd..5537b72a4d 100644 --- a/internal/services/steelthreadtesting/operations.go +++ b/internal/services/steelthreadtesting/operations.go @@ -216,7 +216,7 @@ func cursoredLookupResources(parameters map[string]any, client v1.PermissionsSer } if count != parameters["page_size"].(int) { - return nil, fmt.Errorf("expected full page size of %d for page #%d, got %d", parameters["page_size"].(int), index, count) + return nil, fmt.Errorf("expected full page size of %d for page #%d (of %d), got %d\npage sizes: %v", parameters["page_size"].(int), index, len(resultCounts), count, resultCounts) } } diff --git a/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-16-results.yaml b/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-16-results.yaml index fae8d51ba9..0292a6a3a2 100644 --- a/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-16-results.yaml +++ b/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-16-results.yaml @@ -79,9 +79,6 @@ - 'doc-79' - 'doc-8' - 'doc-9' -- - 'doc-9' - - 'public-doc-0' +- - 'public-doc-0' - 'public-doc-1' - 'public-doc-3' -- - 'public-doc-1' - - 'public-doc-3' diff --git a/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-5-results.yaml b/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-5-results.yaml index 6f3175bf97..9378e5b804 100644 --- a/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-5-results.yaml +++ b/internal/services/steelthreadtesting/steelresults/basic-lookup-resources-indirect-without-other-permission-page-size-5-results.yaml @@ -79,17 +79,6 @@ - 'doc-79' - 'doc-8' - 'doc-9' -- - 'doc-9' - - 'public-doc-0' - - 'public-doc-1' - - 'public-doc-3' -- - 'doc-9' - - 'public-doc-0' - - 'public-doc-1' - - 'public-doc-3' -- - 'public-doc-0' - - 'public-doc-1' - - 'public-doc-3' - - 'public-doc-0' - 'public-doc-1' - 'public-doc-3'