From 543e6e047c41b8f7515965526fc78cd3b49410dc Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 18 Sep 2024 15:00:50 -0400 Subject: [PATCH] Add support for secondary dispatching on LR2 --- internal/dispatch/remote/cluster.go | 180 +++++++++++--- internal/dispatch/remote/cluster_test.go | 295 ++++++++++++++++++++++- 2 files changed, 442 insertions(+), 33 deletions(-) diff --git a/internal/dispatch/remote/cluster.go b/internal/dispatch/remote/cluster.go index 52305a1a11..f41ad999be 100644 --- a/internal/dispatch/remote/cluster.go +++ b/internal/dispatch/remote/cluster.go @@ -216,6 +216,150 @@ func dispatchRequest[Q requestMessage, S responseMessage](ctx context.Context, c return *new(S), foundError } +type requestMessageWithCursor interface { + requestMessage + GetOptionalCursor() *v1.Cursor +} + +type responseMessageWithCursor interface { + responseMessage + GetAfterResponseCursor() *v1.Cursor +} + +type receiver[S responseMessage] interface { + Recv() (S, error) + grpc.ClientStream +} + +const ( + secondaryCursorPrefix = "$$secondary:" + primaryDispatcher = "" +) + +func publishClient[Q requestMessageWithCursor, R responseMessageWithCursor](ctx context.Context, client receiver[R], stream dispatch.Stream[R], secondaryDispatchName string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + + default: + result, err := client.Recv() + if errors.Is(err, io.EOF) { + return nil + } else if err != nil { + return err + } + + merr := adjustMetadataForDispatch(result.GetMetadata()) + if merr != nil { + return merr + } + + if secondaryDispatchName != primaryDispatcher { + afterResponseCursor := result.GetAfterResponseCursor() + if afterResponseCursor == nil { + return spiceerrors.MustBugf("received a nil after response cursor for secondary dispatch") + } + afterResponseCursor.Sections = append([]string{secondaryCursorPrefix + secondaryDispatchName}, afterResponseCursor.Sections...) + } + + serr := stream.Publish(result) + if serr != nil { + return serr + } + } + } +} + +// dispatchStreamingRequest handles the dispatching of a streaming request to the primary and any +// secondary dispatchers. Unlike the non-streaming version, this will first attempt to dispatch +// from the allowed secondary dispatchers before falling back to the primary, rather than running +// them in parallel. +func dispatchStreamingRequest[Q requestMessageWithCursor, R responseMessageWithCursor]( + ctx context.Context, + cr *clusterDispatcher, + reqKey string, + req Q, + stream dispatch.Stream[R], + handler func(context.Context, ClusterClient) (receiver[R], error), +) error { + withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout) + defer cancelFn() + + client, err := handler(withTimeout, cr.clusterClient) + if err != nil { + return err + } + + // Check the cursor to see if the dispatch went to one of the secondary endpoints. + cursor := req.GetOptionalCursor() + cursorLockedSecondaryName := "" + if cursor != nil && len(cursor.Sections) > 0 { + if strings.HasPrefix(cursor.Sections[0], secondaryCursorPrefix) { + cursorLockedSecondaryName = strings.TrimPrefix(cursor.Sections[0], secondaryCursorPrefix) + cursor.Sections = cursor.Sections[1:] + } + } + + // If no secondary dispatches are defined, just invoke directly. + if len(cr.secondaryDispatchExprs) == 0 || len(cr.secondaryDispatch) == 0 { + return publishClient[Q](withTimeout, client, stream, primaryDispatcher) + } + + // If the cursor is locked to a known secondary, dispatch to it. + if cursorLockedSecondaryName != "" { + secondary, ok := cr.secondaryDispatch[cursorLockedSecondaryName] + if ok { + secondaryClient, err := handler(withTimeout, secondary.Client) + if err != nil { + return err + } + + log.Debug().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher based on cursor") + return publishClient[Q](withTimeout, secondaryClient, stream, cursorLockedSecondaryName) + } + + return fmt.Errorf("unknown secondary dispatcher in cursor: %s", cursorLockedSecondaryName) + } + + // Otherwise, look for a matching expression for the initial secondary dispatch + // and, if present, try to dispatch to it. + expr, ok := cr.secondaryDispatchExprs[reqKey] + if !ok { + return publishClient[Q](withTimeout, client, stream, primaryDispatcher) + } + + result, err := RunDispatchExpr(expr, req) + if err != nil { + log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression") + } + + for _, secondaryDispatchName := range result { + secondary, ok := cr.secondaryDispatch[secondaryDispatchName] + if !ok { + log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher") + continue + } + + log.Trace().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher") + secondaryClient, err := handler(withTimeout, secondary.Client) + if err != nil { + log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to create secondary dispatch client") + continue + } + + if err := publishClient[Q](withTimeout, secondaryClient, stream, secondaryDispatchName); err != nil { + log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to publish secondary dispatch response") + continue + } + + return nil + } + + // Fallback: use the primary client if no secondary matched. + return publishClient[Q](withTimeout, client, stream, primaryDispatcher) +} + func adjustMetadataForDispatch(metadata *v1.ResponseMeta) error { if metadata == nil { return spiceerrors.MustBugf("received a nil metadata") @@ -370,38 +514,10 @@ func (cr *clusterDispatcher) DispatchLookupResources2( return err } - withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout) - defer cancelFn() - - client, err := cr.clusterClient.DispatchLookupResources2(withTimeout, req) - if err != nil { - return err - } - - for { - select { - case <-withTimeout.Done(): - return withTimeout.Err() - - default: - result, err := client.Recv() - if errors.Is(err, io.EOF) { - return nil - } else if err != nil { - return err - } - - merr := adjustMetadataForDispatch(result.Metadata) - if merr != nil { - return merr - } - - serr := stream.Publish(result) - if serr != nil { - return serr - } - } - } + return dispatchStreamingRequest(ctx, cr, "lookupresources", req, stream, + func(ctx context.Context, client ClusterClient) (receiver[*v1.DispatchLookupResources2Response], error) { + return client.DispatchLookupResources2(ctx, req) + }) } func (cr *clusterDispatcher) DispatchLookupSubjects( diff --git a/internal/dispatch/remote/cluster_test.go b/internal/dispatch/remote/cluster_test.go index 5f4df726e6..58c7ee2115 100644 --- a/internal/dispatch/remote/cluster_test.go +++ b/internal/dispatch/remote/cluster_test.go @@ -44,6 +44,23 @@ func (fds *fakeDispatchSvc) DispatchLookupSubjects(_ *v1.DispatchLookupSubjectsR }) } +func (fds *fakeDispatchSvc) DispatchLookupResources2(_ *v1.DispatchLookupResources2Request, srv v1.DispatchService_DispatchLookupResources2Server) error { + if fds.dispatchCount == 999 { + return fmt.Errorf("error") + } + + time.Sleep(fds.sleepTime) + return srv.Send(&v1.DispatchLookupResources2Response{ + Metadata: &v1.ResponseMeta{ + DispatchCount: fds.dispatchCount, + }, + AfterResponseCursor: &v1.Cursor{ + Sections: nil, + DispatchVersion: 1, + }, + }) +} + func TestDispatchTimeout(t *testing.T) { for _, tc := range []struct { timeout time.Duration @@ -133,7 +150,7 @@ func TestDispatchTimeout(t *testing.T) { } } -func TestSecondaryDispatch(t *testing.T) { +func TestCheckSecondaryDispatch(t *testing.T) { for _, tc := range []struct { name string expr string @@ -242,6 +259,282 @@ func TestSecondaryDispatch(t *testing.T) { } } +func TestLRSecondaryDispatch(t *testing.T) { + for _, tc := range []struct { + name string + expr string + request *v1.DispatchLookupResources2Request + expectedDispatchCount uint32 + expectedError bool + }{ + { + "no multidispatch", + "['invalid']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 1, + false, + }, + { + "valid multidispatch", + "['secondary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 2, + false, + }, + { + "cursored multidispatch to invalid secondary", + "['secondary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{"somethingelse"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 2, // Falls back to the default secondary. + false, + }, + { + "cursored multidispatch to cursored secondary", + "['secondary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{secondaryCursorPrefix + "tertiary"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 3, + false, + }, + { + "cursored multidispatch to cursored secondary that raises an error", + "['secondary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{secondaryCursorPrefix + "error"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 1, + true, // since the secondary was in the cursor, if it errors, the operation fails. + }, + { + "cursored multidispatch to default secondary that raises an error", + "['error']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 1, + false, + }, + { + "cursored multidispatch to unknown secondary", + "['error']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{secondaryCursorPrefix + "unknown"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 0, + true, + }, + { + "cursored multidispatch to default secondary", + "['secondary', 'tertiary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{secondaryCursorPrefix + "secondary"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 2, + false, + }, + { + "cursored multidispatch to non-default secondary", + "['tertiary', 'secondary']", + &v1.DispatchLookupResources2Request{ + ResourceRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectRelation: &corev1.RelationReference{ + Namespace: "somenamespace", + Relation: "somerelation", + }, + SubjectIds: []string{"foo"}, + TerminalSubject: &corev1.ObjectAndRelation{ + Namespace: "foo", + ObjectId: "bar", + Relation: "...", + }, + OptionalCursor: &v1.Cursor{ + Sections: []string{secondaryCursorPrefix + "secondary"}, + DispatchVersion: 1, + }, + Metadata: &v1.ResolverMeta{DepthRemaining: 50}, + }, + 2, + false, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + conn := connectionForDispatching(t, &fakeDispatchSvc{dispatchCount: 1, sleepTime: 0 * time.Millisecond}) + secondaryConn := connectionForDispatching(t, &fakeDispatchSvc{dispatchCount: 2, sleepTime: 0 * time.Millisecond}) + tertiaryConn := connectionForDispatching(t, &fakeDispatchSvc{dispatchCount: 3, sleepTime: 0 * time.Millisecond}) + errorConn := connectionForDispatching(t, &fakeDispatchSvc{dispatchCount: 999, sleepTime: 0 * time.Millisecond}) + + parsed, err := ParseDispatchExpression("lookupresources", tc.expr) + require.NoError(t, err) + + dispatcher := NewClusterDispatcher(v1.NewDispatchServiceClient(conn), conn, ClusterDispatcherConfig{ + KeyHandler: &keys.DirectKeyHandler{}, + DispatchOverallTimeout: 30 * time.Second, + }, map[string]SecondaryDispatch{ + "secondary": {Name: "secondary", Client: v1.NewDispatchServiceClient(secondaryConn)}, + "tertiary": {Name: "tertiary", Client: v1.NewDispatchServiceClient(tertiaryConn)}, + "error": {Name: "error", Client: v1.NewDispatchServiceClient(errorConn)}, + }, map[string]*DispatchExpr{ + "lookupresources": parsed, + }) + require.True(t, dispatcher.ReadyState().IsReady) + + stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResources2Response](context.Background()) + err = dispatcher.DispatchLookupResources2(tc.request, stream) + + if tc.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, 1, len(stream.Results())) + require.Equal(t, tc.expectedDispatchCount, stream.Results()[0].Metadata.DispatchCount) + } + }) + } +} + func connectionForDispatching(t *testing.T, svc v1.DispatchServiceServer) *grpc.ClientConn { listener := bufconn.Listen(humanize.MiByte) s := grpc.NewServer()