Skip to content

Commit

Permalink
Remove the ONRTypeSet and create a new combined CheckDispatchSet
Browse files Browse the repository at this point in the history
This new set does all the tracking and mapping previously handled by the ONRTypeSet and a custom multimap, in a more compact and better tested implementation

This also allows us to avoid requiring all results for those redispatches that do not have caveats, even if one of the subject types does
  • Loading branch information
josephschorr committed Sep 3, 2024
1 parent 2171dc3 commit bb10dfe
Show file tree
Hide file tree
Showing 6 changed files with 616 additions and 262 deletions.
167 changes: 49 additions & 118 deletions internal/graph/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/authzed/spicedb/internal/taskrunner"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/genutil/mapz"
"github.com/authzed/spicedb/pkg/genutil/slicez"
nspkg "github.com/authzed/spicedb/pkg/namespace"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
Expand Down Expand Up @@ -296,11 +295,6 @@ func combineWithCheckHints(result CheckResult, req ValidatedCheckRequest) CheckR
return result
}

type directDispatch struct {
resourceType *core.RelationReference
resourceIds []string
}

func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequestContext, relation *core.Relation) CheckResult {
ctx, span := tracer.Start(ctx, "checkDirect")
defer span.End()
Expand Down Expand Up @@ -447,56 +441,30 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequest
defer it.Close()
queryCount += 1.0

// Find the subjects over which to dispatch.
subjectsToDispatch := tuple.NewONRByTypeSet()
relationshipsBySubjectONR := mapz.NewMultiMap[string, *core.RelationTuple]()
hasCaveats := false

// Build the set of subjects over which to dispatch, along with metadata for
// mapping over caveats (if any).
checksToDispatch := newCheckDispatchSet()
for tpl := it.Next(); tpl != nil; tpl = it.Next() {
if it.Err() != nil {
return checkResultError(NewCheckFailureErr(it.Err()), emptyMetadata)
}

spiceerrors.DebugAssert(func() bool { return tpl.Subject.Relation != Ellipsis }, "got a terminal for a non-terminal query")

// Add the subject as an object over which to dispatch.
subjectsToDispatch.Add(tpl.Subject)
relationshipsBySubjectONR.Add(tuple.StringONR(tpl.Subject), tpl)
if tpl.Caveat != nil && tpl.Caveat.CaveatName != "" {
hasCaveats = true
}
checksToDispatch.addForRelationship(tpl)
}
it.Close()

// Convert the subjects into batched requests.
// To simplify the logic, +1 is added to account for the situation where
// the number of elements is less than the chunk size, and spare us some annoying code.
expectedNumberOfChunks := subjectsToDispatch.ValueLen()/int(crc.dispatchChunkSize) + 1
toDispatch := make([]directDispatch, 0, expectedNumberOfChunks)
subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) {
chunkCount := 0.0
slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) {
chunkCount++
toDispatch = append(toDispatch, directDispatch{
resourceType: rr,
resourceIds: resourceIdChunk,
})
})
dispatchChunkCountHistogram.Observe(chunkCount)
})

// If there are caveats on the incoming relationships, then we must require all results to be
// found, as we need to ensure that all caveats are used for building the final expression.
resultsSetting := crc.resultsSetting
if hasCaveats {
resultsSetting = v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS
}

// Dispatch and map to the associated resource ID(s).
result := union(ctx, crc, toDispatch, func(ctx context.Context, crc currentRequestContext, dd directDispatch) CheckResult {
toDispatch := checksToDispatch.dispatchChunks(crc.dispatchChunkSize)
result := union(ctx, crc, toDispatch, func(ctx context.Context, crc currentRequestContext, dd checkDispatchChunk) CheckResult {
// If there are caveats on any of the incoming relationships for the subjects to dispatch, then we must require all
// results to be found, as we need to ensure that all caveats are used for building the final expression.
resultsSetting := crc.resultsSetting
if dd.hasIncomingCaveats {
resultsSetting = v1.DispatchCheckRequest_REQUIRE_ALL_RESULTS
}

childResult := cc.dispatch(ctx, crc, ValidatedCheckRequest{
&v1.DispatchCheckRequest{
ResourceRelation: dd.resourceType,
ResourceRelation: tuple.RelationReference(dd.resourceType.namespace, dd.resourceType.relation),
ResourceIds: dd.resourceIds,
Subject: crc.parentReq.Subject,
ResultsSetting: resultsSetting,
Expand All @@ -513,21 +481,24 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequest
return childResult
}

return mapFoundResources(childResult, dd.resourceType, relationshipsBySubjectONR)
return mapFoundResources(childResult, dd.resourceType, checksToDispatch)
}, cc.concurrencyLimit)

return combineResultWithFoundResources(result, foundResources)
}

func mapFoundResources(result CheckResult, resourceType *core.RelationReference, relationshipsBySubjectONR *mapz.MultiMap[string, *core.RelationTuple]) CheckResult {
func mapFoundResources(result CheckResult, resourceType relationRef, checksToDispatch *checkDispatchSet) CheckResult {
// Map any resources found to the parent resource IDs.
membershipSet := NewMembershipSet()
for foundResourceID, result := range result.Resp.ResultsByResourceId {
subjectKey := tuple.StringONRStrings(resourceType.Namespace, foundResourceID, resourceType.Relation)
resourceIDAndCaveats := checksToDispatch.mappingsForSubject(resourceType.namespace, foundResourceID, resourceType.relation)

spiceerrors.DebugAssert(func() bool {
return len(resourceIDAndCaveats) > 0
}, "found resource ID without associated caveats")

tuples, _ := relationshipsBySubjectONR.Get(subjectKey)
for _, relationTuple := range tuples {
membershipSet.AddMemberViaRelationship(relationTuple.ResourceAndRelation.ObjectId, result.Expression, relationTuple)
for _, riac := range resourceIDAndCaveats {
membershipSet.AddMemberWithParentCaveat(riac.resourceID, result.Expression, riac.caveat)
}
}

Expand Down Expand Up @@ -595,7 +566,7 @@ func (cc *ConcurrentChecker) runSetOperation(ctx context.Context, crc currentReq
}
}

func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, crc currentRequestContext, cu *core.ComputedUserset, rr *core.RelationReference, resourceIds []string) CheckResult {
func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, crc currentRequestContext, cu *core.ComputedUserset, rr *relationRef, resourceIds []string) CheckResult {
ctx, span := tracer.Start(ctx, cu.Relation)
defer span.End()

Expand All @@ -606,7 +577,7 @@ func (cc *ConcurrentChecker) checkComputedUserset(ctx context.Context, crc curre
return checkResultError(spiceerrors.MustBugf("computed userset for tupleset without tuples"), emptyMetadata)
}

startNamespace = rr.Namespace
startNamespace = rr.namespace
targetResourceIds = resourceIds
} else if cu.Object == core.ComputedUserset_TUPLE_OBJECT {
if rr != nil {
Expand Down Expand Up @@ -693,7 +664,7 @@ type ttu[T relation] interface {
type checkResultWithType struct {
CheckResult

relationType *core.RelationReference
relationType relationRef
}

func checkIntersectionTupleToUserset(
Expand All @@ -719,38 +690,21 @@ func checkIntersectionTupleToUserset(
}
defer it.Close()

subjectsToDispatch := tuple.NewONRByTypeSet()
relationshipsBySubjectONR := mapz.NewMultiMap[string, *core.RelationTuple]()
checksToDispatch := newCheckDispatchSet()
subjectsByResourceID := mapz.NewMultiMap[string, *core.ObjectAndRelation]()
for tpl := it.Next(); tpl != nil; tpl = it.Next() {
if it.Err() != nil {
return checkResultError(NewCheckFailureErr(it.Err()), emptyMetadata)
}

subjectsToDispatch.Add(tpl.Subject)
relationshipsBySubjectONR.Add(tuple.StringONR(tpl.Subject), tpl)
checksToDispatch.addForRelationship(tpl)
subjectsByResourceID.Add(tpl.ResourceAndRelation.ObjectId, tpl.Subject)
}
it.Close()

// Convert the subjects into batched requests.
// To simplify the logic, +1 is added to account for the situation where
// the number of elements is less than the chunk size, and spare us some annoying code.
expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.dispatchChunkSize + 1
toDispatch := make([]directDispatch, 0, expectedNumberOfChunks)
subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) {
chunkCount := 0.0
slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) {
chunkCount++
toDispatch = append(toDispatch, directDispatch{
resourceType: rr,
resourceIds: resourceIdChunk,
})
})
dispatchChunkCountHistogram.Observe(chunkCount)
})

if subjectsToDispatch.IsEmpty() {
toDispatch := checksToDispatch.dispatchChunks(crc.dispatchChunkSize)
if len(toDispatch) == 0 {
return noMembers()
}

Expand All @@ -766,8 +720,9 @@ func checkIntersectionTupleToUserset(
dispatchChunkSize: crc.dispatchChunkSize,
},
toDispatch,
func(ctx context.Context, crc currentRequestContext, dd directDispatch) checkResultWithType {
childResult := cc.checkComputedUserset(ctx, crc, ttu.GetComputedUserset(), dd.resourceType, dd.resourceIds)
func(ctx context.Context, crc currentRequestContext, dd checkDispatchChunk) checkResultWithType {
resourceType := dd.resourceType
childResult := cc.checkComputedUserset(ctx, crc, ttu.GetComputedUserset(), &resourceType, dd.resourceIds)
return checkResultWithType{
CheckResult: childResult,
relationType: dd.resourceType,
Expand All @@ -780,19 +735,18 @@ func checkIntersectionTupleToUserset(
}

// Create a membership set per-subject-type, representing the membership for each of the dispatched subjects.
resultsByDispatchedSubject := map[string]*MembershipSet{}
resultsByDispatchedSubject := map[relationRef]*MembershipSet{}
combinedMetadata := emptyMetadata
for _, result := range chunkResults {
if result.Err != nil {
return checkResultError(result.Err, emptyMetadata)
}

typeKey := tuple.StringRR(result.relationType)
if _, ok := resultsByDispatchedSubject[typeKey]; !ok {
resultsByDispatchedSubject[typeKey] = NewMembershipSet()
if _, ok := resultsByDispatchedSubject[result.relationType]; !ok {
resultsByDispatchedSubject[result.relationType] = NewMembershipSet()
}

resultsByDispatchedSubject[typeKey].UnionWith(result.Resp.ResultsByResourceId)
resultsByDispatchedSubject[result.relationType].UnionWith(result.Resp.ResultsByResourceId)
combinedMetadata = combineResponseMetadata(combinedMetadata, result.Resp.Metadata)
}

Expand All @@ -813,11 +767,7 @@ func checkIntersectionTupleToUserset(
// was found for each. If any are not found, then the resource ID is not a member.
// We also collect up the caveats for each subject, as they will be added to the final result.
for _, subject := range subjects {
subjectTypeKey := tuple.StringRR(&core.RelationReference{
Namespace: subject.Namespace,
Relation: subject.Relation,
})

subjectTypeKey := relationRef{subject.Namespace, subject.Relation}
results, ok := resultsByDispatchedSubject[subjectTypeKey]
if !ok {
hasAllSubjects = false
Expand All @@ -835,11 +785,10 @@ func checkIntersectionTupleToUserset(
}

// Add any caveats on the subject from the starting relationship(s) as well.
subjectKey := tuple.StringONR(subject)
tuples, _ := relationshipsBySubjectONR.Get(subjectKey)
for _, relationTuple := range tuples {
if relationTuple.Caveat != nil {
caveats = append(caveats, wrapCaveat(relationTuple.Caveat))
resourceIDAndCaveats := checksToDispatch.mappingsForSubject(subject.Namespace, subject.ObjectId, subject.Relation)
for _, riac := range resourceIDAndCaveats {
if riac.caveat != nil {
caveats = append(caveats, wrapCaveat(riac.caveat))
}
}
}
Expand Down Expand Up @@ -904,46 +853,28 @@ func checkTupleToUserset[T relation](
}
defer it.Close()

subjectsToDispatch := tuple.NewONRByTypeSet()
relationshipsBySubjectONR := mapz.NewMultiMap[string, *core.RelationTuple]()
checksToDispatch := newCheckDispatchSet()
for tpl := it.Next(); tpl != nil; tpl = it.Next() {
if it.Err() != nil {
return checkResultError(NewCheckFailureErr(it.Err()), emptyMetadata)
}

subjectsToDispatch.Add(tpl.Subject)
relationshipsBySubjectONR.Add(tuple.StringONR(tpl.Subject), tpl)
checksToDispatch.addForRelationship(tpl)
}
it.Close()

// Convert the subjects into batched requests.
// To simplify the logic, +1 is added to account for the situation where
// the number of elements is less than the chunk size, and spare us some annoying code.
expectedNumberOfChunks := uint16(subjectsToDispatch.ValueLen())/crc.dispatchChunkSize + 1
toDispatch := make([]directDispatch, 0, expectedNumberOfChunks)
subjectsToDispatch.ForEachType(func(rr *core.RelationReference, resourceIds []string) {
chunkCount := 0.0
slicez.ForEachChunk(resourceIds, crc.dispatchChunkSize, func(resourceIdChunk []string) {
chunkCount++
toDispatch = append(toDispatch, directDispatch{
resourceType: rr,
resourceIds: resourceIdChunk,
})
})
dispatchChunkCountHistogram.Observe(chunkCount)
})

toDispatch := checksToDispatch.dispatchChunks(crc.dispatchChunkSize)
return combineWithComputedHints(union(
ctx,
crc,
toDispatch,
func(ctx context.Context, crc currentRequestContext, dd directDispatch) CheckResult {
childResult := cc.checkComputedUserset(ctx, crc, ttu.GetComputedUserset(), dd.resourceType, dd.resourceIds)
func(ctx context.Context, crc currentRequestContext, dd checkDispatchChunk) CheckResult {
resourceType := dd.resourceType
childResult := cc.checkComputedUserset(ctx, crc, ttu.GetComputedUserset(), &resourceType, dd.resourceIds)
if childResult.Err != nil {
return childResult
}

return mapFoundResources(childResult, dd.resourceType, relationshipsBySubjectONR)
return mapFoundResources(childResult, dd.resourceType, checksToDispatch)
},
cc.concurrencyLimit,
), hintsToReturn)
Expand Down
Loading

0 comments on commit bb10dfe

Please sign in to comment.