Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix experimental LookupResources2 to shear the tree earlier on indirect permissions #2005

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 422 additions & 6 deletions internal/dispatch/graph/lookupresources2_test.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions internal/dispatch/graph/lookupresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,12 @@ func TestMaxDepthLookup(t *testing.T) {
require.Error(err)
}

func joinTuples(first []*core.RelationTuple, second []*core.RelationTuple) []*core.RelationTuple {
return append(first, second...)
func joinTuples(first []*core.RelationTuple, others ...[]*core.RelationTuple) []*core.RelationTuple {
current := first
for _, second := range others {
current = append(current, second...)
}
return current
}

func genTuplesWithOffset(resourceName string, relation string, subjectName string, subjectID string, offset int, number int) []*core.RelationTuple {
Expand All @@ -357,11 +361,15 @@ func genSubjectTuples(resourceName string, relation string, subjectName string,
}

func genTuplesWithCaveat(resourceName string, relation string, subjectName string, subjectID string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
return genTuplesWithCaveatAndSubjectRelation(resourceName, relation, subjectName, subjectID, "...", caveatName, context, offset, number)
}

func genTuplesWithCaveatAndSubjectRelation(resourceName string, relation string, subjectName string, subjectID string, subjectRelation string, caveatName string, context map[string]any, offset int, number int) []*core.RelationTuple {
tuples := make([]*core.RelationTuple, 0, number)
for i := 0; i < number; i++ {
tpl := &core.RelationTuple{
ResourceAndRelation: ONR(resourceName, fmt.Sprintf("%s-%d", resourceName, i+offset), relation),
Subject: ONR(subjectName, subjectID, "..."),
Subject: ONR(subjectName, subjectID, subjectRelation),
}
if caveatName != "" {
tpl = tuple.MustWithCaveat(tpl, caveatName, context)
Expand Down
4 changes: 2 additions & 2 deletions internal/graph/lookupresources2.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ func (crr *CursoredLookupResources2) redispatchOrReport(
}, stream)
}

// Otherwise, we need to dispatch and filter results by batch checking along the way.
return runDispatchAndChecker(
// Otherwise, we need to filter results by batch checking along the way before dispatching.
return runCheckerAndDispatch(
ctx,
parentRequest,
foundResources,
Expand Down
205 changes: 114 additions & 91 deletions internal/graph/lr2streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graph

import (
"context"
"strconv"
"sync"

"github.com/authzed/spicedb/internal/dispatch"
Expand All @@ -15,10 +16,10 @@ import (
"github.com/authzed/spicedb/pkg/typesystem"
)

// runDispatchAndChecker runs the dispatch and checker for a lookup resources call, and publishes
// the results to the parent stream. This function is responsible for handling the dispatching
// of the lookup resources call, and then checking the results to filter them.
func runDispatchAndChecker(
// runCheckerAndDispatch runs the dispatch and checker for a lookup resources call, and publishes
// the results to the parent stream. This function is responsible for handling checking the
// results to filter them, and then dispatching those found.
func runCheckerAndDispatch(
ctx context.Context,
parentReq ValidatedLookupResources2Request,
foundResources dispatchableResourcesSubjectMap2,
Expand All @@ -35,13 +36,19 @@ func runDispatchAndChecker(
// Only allow max one dispatcher and one checker to run concurrently.
concurrencyLimit = min(concurrencyLimit, 2)

rdc := &dispatchAndCheckRunner{
currentCheckIndex, err := ci.integerSectionValue()
if err != nil {
return err
}

rdc := &checkAndDispatchRunner{
parentRequest: parentReq,
foundResources: foundResources,
ci: ci,
parentStream: parentStream,
newSubjectType: newSubjectType,
filteredSubjectIDs: filteredSubjectIDs,
currentCheckIndex: currentCheckIndex,
entrypoint: entrypoint,
lrDispatcher: lrDispatcher,
checkDispatcher: checkDispatcher,
Expand All @@ -53,87 +60,53 @@ func runDispatchAndChecker(
return rdc.runAndWait()
}

type dispatchAndCheckRunner struct {
parentRequest ValidatedLookupResources2Request
foundResources dispatchableResourcesSubjectMap2
ci cursorInformation
parentStream dispatch.LookupResources2Stream
newSubjectType *core.RelationReference
type checkAndDispatchRunner struct {
parentRequest ValidatedLookupResources2Request
foundResources dispatchableResourcesSubjectMap2
ci cursorInformation
parentStream dispatch.LookupResources2Stream
newSubjectType *core.RelationReference
entrypoint typesystem.ReachabilityEntrypoint
lrDispatcher dispatch.LookupResources2
checkDispatcher dispatch.Check
dispatchChunkSize uint16

filteredSubjectIDs []string
entrypoint typesystem.ReachabilityEntrypoint
lrDispatcher dispatch.LookupResources2
checkDispatcher dispatch.Check
dispatchChunkSize uint16
currentCheckIndex int

taskrunner *taskrunner.TaskRunner

lock *sync.Mutex
}

func (rdc *dispatchAndCheckRunner) dispatchAndCollect(ctx context.Context, cursor *v1.Cursor) ([]*v1.DispatchLookupResources2Response, error) {
collectingStream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](ctx)
err := rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: rdc.parentRequest.ResourceRelation,
SubjectRelation: rdc.newSubjectType,
SubjectIds: rdc.filteredSubjectIDs,
TerminalSubject: rdc.parentRequest.TerminalSubject,
Metadata: &v1.ResolverMeta{
AtRevision: rdc.parentRequest.Revision.String(),
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
},
OptionalCursor: cursor,
OptionalLimit: uint32(rdc.dispatchChunkSize),
}, collectingStream)
return collectingStream.Results(), err
}

func (rdc *dispatchAndCheckRunner) runDispatch(ctx context.Context, cursor *v1.Cursor) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

collected, err := rdc.dispatchAndCollect(ctx, cursor)
if err != nil {
return err
}

if len(collected) == 0 {
return nil
}

// Kick off a worker to filter the results via a check and then publish what was found.
func (rdc *checkAndDispatchRunner) runAndWait() error {
// Kick off a check at the current cursor, to filter a portion of the initial results set.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runChecker(ctx, collected)
return rdc.runChecker(ctx, rdc.currentCheckIndex)
})

// Start another dispatch at the cursor of the last response, to run in the background
// and collect more results for filtering while the checker is running.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, collected[len(collected)-1].AfterResponseCursor)
})

return nil
return rdc.taskrunner.Wait()
}

func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*v1.DispatchLookupResources2Response) error {
func (rdc *checkAndDispatchRunner) runChecker(ctx context.Context, startingIndex int) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the limit should just be an atomic, and then you can get rid of the locking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not; we'd have to change the implementation and its use everywhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion was to change it to an atomic and update the callsites. That can be a followup though.

rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

checkHints := make([]*v1.CheckHint, 0, len(collected))
resourceIDsToCheck := make([]string, 0, len(collected))
for _, resource := range collected {
resourceIDsToCheck = append(resourceIDsToCheck, resource.Resource.ResourceId)
endingIndex := min(startingIndex+int(rdc.dispatchChunkSize), len(rdc.filteredSubjectIDs))
resourceIDsToCheck := rdc.filteredSubjectIDs[startingIndex:endingIndex]
if len(resourceIDsToCheck) == 0 {
return nil
}

checkHints := make([]*v1.CheckHint, 0, len(resourceIDsToCheck))
for _, resourceID := range resourceIDsToCheck {
checkHint, err := hints.HintForEntrypoint(
rdc.entrypoint,
resource.Resource.ResourceId,
resourceID,
rdc.parentRequest.TerminalSubject,
&v1.ResourceCheckResult{
Membership: v1.ResourceCheckResult_MEMBER,
Expand All @@ -144,9 +117,10 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
checkHints = append(checkHints, checkHint)
}

// Batch check the results to filter to those visible and then publish just the visible resources.
// NOTE: we are checking the containing permission here, *not* the target relation, as
// the goal is to shear for the containing permission.
resultsByResourceID, checkMetadata, err := computed.ComputeBulkCheck(ctx, rdc.checkDispatcher, computed.CheckParameters{
ResourceType: rdc.parentRequest.ResourceRelation,
ResourceType: rdc.newSubjectType,
Subject: rdc.parentRequest.TerminalSubject,
CaveatContext: rdc.parentRequest.Context.AsMap(),
AtRevision: rdc.parentRequest.Revision,
Expand All @@ -158,10 +132,12 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
return err
}

// Publish any resources that are visible.
isFirstPublishCall := true
for _, resource := range collected {
result, ok := resultsByResourceID[resource.Resource.ResourceId]
adjustedResources := rdc.foundResources.cloneAsMutable()

// Dispatch any resources that are visible.
resourceIDToDispatch := make([]string, 0, len(resourceIDsToCheck))
for _, resourceID := range resourceIDsToCheck {
result, ok := resultsByResourceID[resourceID]
if !ok {
continue
}
Expand All @@ -171,19 +147,9 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
fallthrough

case v1.ResourceCheckResult_CAVEATED_MEMBER:
rdc.lock.Lock()
if err := publishResultToParentStream(resource, rdc.ci, rdc.foundResources, result.MissingExprFields, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
rdc.lock.Unlock()
return err
}

isFirstPublishCall = false

if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()
// Record any additional caveats missing from the check.
adjustedResources.withAdditionalMissingContextForDispatchedResourceID(resourceID, result.MissingExprFields)
resourceIDToDispatch = append(resourceIDToDispatch, resourceID)

case v1.ResourceCheckResult_NOT_MEMBER:
// Skip.
Expand All @@ -194,18 +160,74 @@ func (rdc *dispatchAndCheckRunner) runChecker(ctx context.Context, collected []*
}
}

if len(resourceIDToDispatch) > 0 {
// Schedule a dispatch of those resources.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, resourceIDToDispatch, adjustedResources.asReadOnly(), checkMetadata, startingIndex)
})
}

// Start the next check chunk (if applicable).
nextIndex := startingIndex + len(resourceIDsToCheck)
if nextIndex < len(rdc.filteredSubjectIDs) {
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runChecker(ctx, nextIndex)
})
}

return nil
}

func (rdc *dispatchAndCheckRunner) runAndWait() error {
currentCursor := rdc.ci.currentCursor
func (rdc *checkAndDispatchRunner) runDispatch(
ctx context.Context,
resourceIDsToDispatch []string,
adjustedResources dispatchableResourcesSubjectMap2,
checkMetadata *v1.ResponseMeta,
startingIndex int,
) error {
rdc.lock.Lock()
if rdc.ci.limits.hasExhaustedLimit() {
rdc.lock.Unlock()
return nil
}
rdc.lock.Unlock()

// Kick off a dispatch at the current cursor.
rdc.taskrunner.Schedule(func(ctx context.Context) error {
return rdc.runDispatch(ctx, currentCursor)
// NOTE: Since we extracted a custom section from the cursor at the beginning of this run, we have to add
// the starting index to the cursor to ensure that the next run starts from the correct place, and we have
// to use the *updated* cursor below on the dispatch.
updatedCi, err := rdc.ci.withOutgoingSection(strconv.Itoa(startingIndex))
if err != nil {
return err
}
responsePartialCursor := updatedCi.responsePartialCursor()

// Dispatch to the parent resource type and publish any results found.
isFirstPublishCall := true

wrappedStream := dispatch.NewHandlingDispatchStream(ctx, func(result *v1.DispatchLookupResources2Response) error {
if err := ctx.Err(); err != nil {
return err
}

if err := publishResultToParentStream(result, rdc.ci, responsePartialCursor, adjustedResources, nil, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
return err
}
isFirstPublishCall = false
return nil
})

return rdc.taskrunner.Wait()
return rdc.lrDispatcher.DispatchLookupResources2(&v1.DispatchLookupResources2Request{
ResourceRelation: rdc.parentRequest.ResourceRelation,
SubjectRelation: rdc.newSubjectType,
SubjectIds: resourceIDsToDispatch,
TerminalSubject: rdc.parentRequest.TerminalSubject,
Metadata: &v1.ResolverMeta{
AtRevision: rdc.parentRequest.Revision.String(),
DepthRemaining: rdc.parentRequest.Metadata.DepthRemaining - 1,
},
OptionalCursor: updatedCi.currentCursor,
OptionalLimit: rdc.ci.limits.currentLimit,
}, wrappedStream)
}

// unfilteredLookupResourcesDispatchStreamForEntrypoint creates a new dispatch stream that wraps
Expand All @@ -227,7 +249,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
default:
}

if err := publishResultToParentStream(result, ci, foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
if err := publishResultToParentStream(result, ci, ci.responsePartialCursor(), foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
return err
}
isFirstPublishCall = false
Expand All @@ -242,6 +264,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
func publishResultToParentStream(
result *v1.DispatchLookupResources2Response,
ci cursorInformation,
responseCursor *v1.Cursor,
foundResources dispatchableResourcesSubjectMap2,
additionalMissingContext []string,
isFirstPublishCall bool,
Expand All @@ -261,7 +284,7 @@ func publishResultToParentStream(

// The cursor for the response is that of the parent response + the cursor from the result itself.
afterResponseCursor, err := combineCursors(
ci.responsePartialCursor(),
responseCursor,
result.AfterResponseCursor,
)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on it, but:

        mappedResource, err := foundResources.mapPossibleResource(result.Resource)
        ...
	missingContextParameters := mapz.NewSet(mappedResource.MissingContextParams...)
	missingContextParameters.Extend(result.Resource.MissingContextParams)
	missingContextParameters.Extend(additionalMissingContext)
	mappedResource.MissingContextParams = missingContextParameters.AsSlice()

Why not just have mappedResource.MissingContextParams be a set so that you can just do:

        mappedResource, err := foundResources.mapPossibleResource(result.Resource)
        mappedResource.MissingContextParams.Extend(result.Resource.MissingContextParams)
        mappedResource.MissingContextParams.Extend(additionalMissingContext)

and then convert to a slice when you need to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a proto so it can't be a set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment on this either but:

	metadata := result.Metadata
	if isFirstPublishCall {
		metadata = addCallToResponseMetadata(metadata)
		metadata = combineResponseMetadata(metadata, additionalMetadata)
	} else {
		metadata = addAdditionalDepthRequired(metadata)
	}

addCallToResponseMetadata increments Dispatch and DepthRequired
addAdditionalDepthRequired increments just DepthRequired

Shouldn't depth and dispatch go hand in hand? Why is dispatch only incremented on the first publish for a stream - or alternatively, why is depth required incremented for messages on an existing stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No; we only increase the dispatch count on the first result lest we double count it (this is what caused the dispatch count bug in LR many moons ago)

Expand Down
Loading
Loading