Skip to content

Commit

Permalink
Switch experimental LookupResources2 to request additional chunks of …
Browse files Browse the repository at this point in the history
…dispatched resources when checking those already received from another dispatch

Adds some parallelism back into LR2
  • Loading branch information
josephschorr committed Jul 12, 2024
1 parent f2c40f4 commit 5668e71
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 240 deletions.
274 changes: 50 additions & 224 deletions internal/graph/lookupresources2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package graph

import (
"context"
"errors"
"slices"
"sort"

Expand Down Expand Up @@ -439,6 +438,11 @@ func hintString(resourceID string, entrypoint typesystem.ReachabilityEntrypoint,
return typesystem.CheckHint(resourceKey, terminalSubject), nil
}

type possibleResourceAndIndex struct {
resource *v1.PossibleResource
index int
}

// redispatchOrReport checks if further redispatching is necessary for the found resource
// type. If not, and the found resource type+relation matches the target resource type+relation,
// the resource is reported to the parent stream.
Expand Down Expand Up @@ -483,7 +487,14 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
return nil
}

filtered := offsetted
filtered := make([]possibleResourceAndIndex, 0, len(offsetted))
for index, resource := range offsetted {
filtered = append(filtered, possibleResourceAndIndex{
resource: resource,
index: index,
})
}

metadata := emptyMetadata

// If the entrypoint is not a direct result, issue a check to further filter the results on the intersection or exclusion.
Expand Down Expand Up @@ -517,25 +528,31 @@ func (crr *CursoredLookupResources2) redispatchOrReport(

metadata = addCallToResponseMetadata(checkMetadata)

filtered = make([]*v1.PossibleResource, 0, len(offsetted))
for _, resource := range offsetted {
filtered = make([]possibleResourceAndIndex, 0, len(offsetted))
for index, resource := range offsetted {
result, ok := resultsByResourceID[resource.ResourceId]
if !ok {
continue
}

switch result.Membership {
case v1.ResourceCheckResult_MEMBER:
filtered = append(filtered, resource)
filtered = append(filtered, possibleResourceAndIndex{
resource: resource,
index: index,
})

case v1.ResourceCheckResult_CAVEATED_MEMBER:
missingContextParams := mapz.NewSet(result.MissingExprFields...)
missingContextParams.Extend(resource.MissingContextParams)

filtered = append(filtered, &v1.PossibleResource{
ResourceId: resource.ResourceId,
ForSubjectIds: resource.ForSubjectIds,
MissingContextParams: missingContextParams.AsSlice(),
filtered = append(filtered, possibleResourceAndIndex{
resource: &v1.PossibleResource{
ResourceId: resource.ResourceId,
ForSubjectIds: resource.ForSubjectIds,
MissingContextParams: missingContextParams.AsSlice(),
},
index: index,
})

case v1.ResourceCheckResult_NOT_MEMBER:
Expand All @@ -547,15 +564,15 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
}
}

for index, resource := range filtered {
for _, resourceAndIndex := range filtered {
if !ci.limits.prepareForPublishing() {
return nil
}

err := parentStream.Publish(&v1.DispatchLookupResources2Response{
Resource: resource,
Resource: resourceAndIndex.resource,
Metadata: metadata,
AfterResponseCursor: nextCursorWith(currentOffset + index + 1),
AfterResponseCursor: nextCursorWith(currentOffset + resourceAndIndex.index + 1),
})
if err != nil {
return err
Expand All @@ -581,28 +598,11 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
return nil
}

// The stream that collects the results of the dispatch will add metadata to the response,
// map the results found based on the mapping data in the results and, if the entrypoint is not
// direct, issue a check to further filter the results.
currentCursor := ci.currentCursor

// Loop until we've produced enough results to satisfy the limit. This is necessary because
// the dispatch may return a set of results that, after checking, is less than the limit.
for {
stream, completed := lookupResourcesDispatchStreamForEntrypoint(ctx, foundResources, parentStream, entrypoint, ci, parentRequest, crr.dc)

// NOTE: if the entrypoint is a direct result, then all results returned by the dispatch will, themselves,
// be direct results. In this case, we can request the full limit of results. If the entrypoint is not a
// direct result, then we must request more than the limit in the hope that we get enough results to satisfy the
// limit after filtering.
var limit uint32 = uint32(datastore.FilterMaximumIDCount)
if entrypoint.IsDirectResult() {
limit = parentRequest.OptionalLimit
}

// Dispatch the found resources as the subjects for the next call, to continue the
// resolution.
err = crr.dl.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
// If the entrypoint is a direct result then we can simply dispatch directly and map
// all found results, as no further filtering will be needed.
if entrypoint.IsDirectResult() {
stream := unfilteredLookupResourcesDispatchStreamForEntrypoint(ctx, foundResources, parentStream, ci)
return crr.dl.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: parentRequest.ResourceRelation,
SubjectRelation: newSubjectType,
SubjectIds: filteredSubjectIDs,
Expand All @@ -611,198 +611,24 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
AtRevision: parentRequest.Revision.String(),
DepthRemaining: parentRequest.Metadata.DepthRemaining - 1,
},
OptionalCursor: currentCursor,
OptionalLimit: limit, // Request more than the limit to hopefully get enough results.
OptionalCursor: ci.currentCursor,
OptionalLimit: parentRequest.OptionalLimit,
}, stream)
if err != nil {
// If the dispatch was canceled due to the limit, do not treat it as an error.
if errors.Is(err, errCanceledBecauseLimitReached) {
return err
}
}

nextCursor, err := completed()
if err != nil {
return err
}

if nextCursor == nil || ci.limits.hasExhaustedLimit() {
break
}
currentCursor = nextCursor
}

return nil
// Otherwise, we need to dispatch and filter results by batch checking along the way.
return runDispatchAndChecker(
ctx,
parentRequest,
foundResources,
ci,
parentStream,
newSubjectType,
filteredSubjectIDs,
entrypoint,
crr.dl,
crr.dc,
crr.concurrencyLimit,
)
})
}

func lookupResourcesDispatchStreamForEntrypoint(
ctx context.Context,
foundResources dispatchableResourcesSubjectMap2,
parentStream dispatch.LookupResources2Stream,
entrypoint typesystem.ReachabilityEntrypoint,
ci cursorInformation,
parentRequest ValidatedLookupResources2Request,
dc dispatch.Check,
) (dispatch.LookupResources2Stream, func() (*v1.Cursor, error)) {
// Branch the context so that the dispatch can be canceled without canceling the parent
// call.
sctx, cancelDispatch := branchContext(ctx)

needsCallAddedToMetadata := true
resultsToCheck := make([]*v1.DispatchLookupResources2Response, 0, int(datastore.FilterMaximumIDCount))
var nextCursor *v1.Cursor

publishResultToParentStream := func(
result *v1.DispatchLookupResources2Response,
additionalMissingContext []string,
additionalMetadata *v1.ResponseMeta,
) error {
// Map the found resources via the subject+resources used for dispatching, to determine
// if any need to be made conditional due to caveats.
mappedResource, err := foundResources.mapPossibleResource(result.Resource)
if err != nil {
return err
}

if !ci.limits.prepareForPublishing() {
cancelDispatch(errCanceledBecauseLimitReached)
return nil
}

// The cursor for the response is that of the parent response + the cursor from the result itself.
afterResponseCursor, err := combineCursors(
ci.responsePartialCursor(),
result.AfterResponseCursor,
)
if err != nil {
return err
}

metadata := combineResponseMetadata(result.Metadata, additionalMetadata)

// Only the first dispatched result gets the call added to it. This is to prevent overcounting
// of the batched dispatch.
if needsCallAddedToMetadata {
metadata = addCallToResponseMetadata(metadata)
needsCallAddedToMetadata = false
} else {
metadata = addAdditionalDepthRequired(metadata)
}

missingContextParameters := mapz.NewSet(mappedResource.MissingContextParams...)
missingContextParameters.Extend(result.Resource.MissingContextParams)
missingContextParameters.Extend(additionalMissingContext)

mappedResource.MissingContextParams = missingContextParameters.AsSlice()

resp := &v1.DispatchLookupResources2Response{
Resource: mappedResource,
Metadata: metadata,
AfterResponseCursor: afterResponseCursor,
}

return parentStream.Publish(resp)
}

batchCheckAndPublishIfNecessary := func(result *v1.DispatchLookupResources2Response) error {
// Add the result to the list of results to check. If nil, this is the final call to check+publish.
if result != nil {
resultsToCheck = append(resultsToCheck, result)
}

// If we have not yet reached the maximum number of results to check and this is not the final
// call, return early.
if len(resultsToCheck) < int(datastore.FilterMaximumIDCount) && result != nil {
return nil
}

// Ensure there are items left to check.
if len(resultsToCheck) == 0 {
return nil
}

// Build the set of resource IDs to check and the hints to short circuit the check on the current entrypoint.
checkHints := make(map[string]*v1.ResourceCheckResult, len(resultsToCheck))
resourceIDsToCheck := make([]string, 0, len(resultsToCheck))
for _, resource := range resultsToCheck {
hintKey, err := hintString(resource.Resource.ResourceId, entrypoint, parentRequest.TerminalSubject)
if err != nil {
return err
}

resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId)

checkHints[hintKey] = &v1.ResourceCheckResult{
Membership: v1.ResourceCheckResult_MEMBER,
}
}

// Batch check the results to filter to those visible and then publish just the visible resources.
resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, dc, computed.CheckParameters{
ResourceType: parentRequest.ResourceRelation,
Subject: parentRequest.TerminalSubject,
CaveatContext: parentRequest.Context.AsMap(),
AtRevision: parentRequest.Revision,
MaximumDepth: parentRequest.Metadata.DepthRemaining - 1,
DebugOption: computed.NoDebugging,
CheckHints: checkHints,
}, resourceIDsToCheck)
if err != nil {
return err
}

metadata := checkMetadata
for _, resource := range resultsToCheck {
result, ok := resultsByResourceID[resource.Resource.ResourceId]
if !ok {
continue
}

switch result.Membership {
case v1.ResourceCheckResult_MEMBER:
fallthrough

case v1.ResourceCheckResult_CAVEATED_MEMBER:
if err := publishResultToParentStream(resource, result.MissingExprFields, metadata); err != nil {
return err
}
metadata = emptyMetadata

case v1.ResourceCheckResult_NOT_MEMBER:
// Skip.
continue

default:
return spiceerrors.MustBugf("unexpected result from check: %v", result.Membership)
}
}

resultsToCheck = make([]*v1.DispatchLookupResources2Response, 0, int(datastore.FilterMaximumIDCount))
return nil
}

wrappedStream := dispatch.NewHandlingDispatchStream(sctx, func(result *v1.DispatchLookupResources2Response) error {
select {
case <-ctx.Done():
return ctx.Err()

default:
}

nextCursor = result.AfterResponseCursor

// If the entrypoint is a direct result, simply publish the found resource.
if entrypoint.IsDirectResult() {
return publishResultToParentStream(result, nil, emptyMetadata)
}

// Otherwise, queue the result for checking and publishing if the check succeeds.
return batchCheckAndPublishIfNecessary(result)
})

return wrappedStream, func() (*v1.Cursor, error) {
defer cancelDispatch(nil)
return nextCursor, batchCheckAndPublishIfNecessary(nil)
}
}
Loading

0 comments on commit 5668e71

Please sign in to comment.