Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for secondary dispatching on LR2 #2069

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 148 additions & 32 deletions internal/dispatch/remote/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,150 @@ func dispatchRequest[Q requestMessage, S responseMessage](ctx context.Context, c
return *new(S), foundError
}

type requestMessageWithCursor interface {
requestMessage
GetOptionalCursor() *v1.Cursor
}

type responseMessageWithCursor interface {
responseMessage
GetAfterResponseCursor() *v1.Cursor
}

type receiver[S responseMessage] interface {
Recv() (S, error)
grpc.ClientStream
}

const (
secondaryCursorPrefix = "$$secondary:"
primaryDispatcher = ""
)

func publishClient[Q requestMessageWithCursor, R responseMessageWithCursor](ctx context.Context, client receiver[R], stream dispatch.Stream[R], secondaryDispatchName string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Q used anywhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its used for the type of the incoming message, off of which I grab the cursor. I could have the req just be that interface directly, but I always prefer well-typed wherever possible

for {
select {
case <-ctx.Done():
return ctx.Err()

default:
result, err := client.Recv()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return err
}

merr := adjustMetadataForDispatch(result.GetMetadata())
if merr != nil {
return merr
}

if secondaryDispatchName != primaryDispatcher {
afterResponseCursor := result.GetAfterResponseCursor()
if afterResponseCursor == nil {
return spiceerrors.MustBugf("received a nil after response cursor for secondary dispatch")
}
afterResponseCursor.Sections = append([]string{secondaryCursorPrefix + secondaryDispatchName}, afterResponseCursor.Sections...)
}

serr := stream.Publish(result)
if serr != nil {
return serr
}
}
}
}

// dispatchStreamingRequest handles the dispatching of a streaming request to the primary and any
// secondary dispatchers. Unlike the non-streaming version, this will first attempt to dispatch
// from the allowed secondary dispatchers before falling back to the primary, rather than running
// them in parallel.
func dispatchStreamingRequest[Q requestMessageWithCursor, R responseMessageWithCursor](
ctx context.Context,
cr *clusterDispatcher,
reqKey string,
req Q,
stream dispatch.Stream[R],
handler func(context.Context, ClusterClient) (receiver[R], error),
) error {
withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

client, err := handler(withTimeout, cr.clusterClient)
if err != nil {
return err
}

// Check the cursor to see if the dispatch went to one of the secondary endpoints.
cursor := req.GetOptionalCursor()
cursorLockedSecondaryName := ""
if cursor != nil && len(cursor.Sections) > 0 {
if strings.HasPrefix(cursor.Sections[0], secondaryCursorPrefix) {
cursorLockedSecondaryName = strings.TrimPrefix(cursor.Sections[0], secondaryCursorPrefix)
cursor.Sections = cursor.Sections[1:]
}
}

// If no secondary dispatches are defined, just invoke directly.
if len(cr.secondaryDispatchExprs) == 0 || len(cr.secondaryDispatch) == 0 {
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

// If the cursor is locked to a known secondary, dispatch to it.
if cursorLockedSecondaryName != "" {
secondary, ok := cr.secondaryDispatch[cursorLockedSecondaryName]
if ok {
secondaryClient, err := handler(withTimeout, secondary.Client)
if err != nil {
return err
}

log.Debug().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher based on cursor")
return publishClient[Q](withTimeout, secondaryClient, stream, cursorLockedSecondaryName)
}

return fmt.Errorf("unknown secondary dispatcher in cursor: %s", cursorLockedSecondaryName)
}

// Otherwise, look for a matching expression for the initial secondary dispatch
// and, if present, try to dispatch to it.
expr, ok := cr.secondaryDispatchExprs[reqKey]
if !ok {
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

result, err := RunDispatchExpr(expr, req)
if err != nil {
log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
}

for _, secondaryDispatchName := range result {
secondary, ok := cr.secondaryDispatch[secondaryDispatchName]
if !ok {
log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
continue
}

log.Trace().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher")
secondaryClient, err := handler(withTimeout, secondary.Client)
if err != nil {
log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to create secondary dispatch client")
continue
}

if err := publishClient[Q](withTimeout, secondaryClient, stream, secondaryDispatchName); err != nil {
log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to publish secondary dispatch response")
continue
}

return nil
}

// Fallback: use the primary client if no secondary matched.
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

func adjustMetadataForDispatch(metadata *v1.ResponseMeta) error {
if metadata == nil {
return spiceerrors.MustBugf("received a nil metadata")
Expand Down Expand Up @@ -370,38 +514,10 @@ func (cr *clusterDispatcher) DispatchLookupResources2(
return err
}

withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

client, err := cr.clusterClient.DispatchLookupResources2(withTimeout, req)
if err != nil {
return err
}

for {
select {
case <-withTimeout.Done():
return withTimeout.Err()

default:
result, err := client.Recv()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return err
}

merr := adjustMetadataForDispatch(result.Metadata)
if merr != nil {
return merr
}

serr := stream.Publish(result)
if serr != nil {
return serr
}
}
}
return dispatchStreamingRequest(ctx, cr, "lookupresources", req, stream,
func(ctx context.Context, client ClusterClient) (receiver[*v1.DispatchLookupResources2Response], error) {
return client.DispatchLookupResources2(ctx, req)
})
}

func (cr *clusterDispatcher) DispatchLookupSubjects(
Expand Down
Loading
Loading