From 2023214192289a04ff28904d7cdd756cbfc9d14f Mon Sep 17 00:00:00 2001 From: Oliver Ford Date: Thu, 19 Sep 2024 21:47:18 +0100 Subject: [PATCH] Fix already reserved panic on concurrent invokes Closes #97. --- lambda/rapidcore/server.go | 50 +++----------------------------------- 1 file changed, 3 insertions(+), 47 deletions(-) diff --git a/lambda/rapidcore/server.go b/lambda/rapidcore/server.go index ed4b029..42e7b2f 100644 --- a/lambda/rapidcore/server.go +++ b/lambda/rapidcore/server.go @@ -107,59 +107,35 @@ type Server struct { var _ interop.Server = (*Server)(nil) func (s *Server) setRapidPhase(phase rapidPhase) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.rapidPhase = phase } func (s *Server) getRapidPhase() rapidPhase { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.rapidPhase } func (s *Server) setRuntimeState(state runtimeState) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.runtimeState = state } func (s *Server) getRuntimeState() runtimeState { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.runtimeState } func (s *Server) SetInvokeTimeout(timeout time.Duration) { - s.mutex.Lock() - defer s.mutex.Unlock() - s.invokeTimeout = timeout } func (s *Server) GetInvokeTimeout() time.Duration { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.invokeTimeout } func (s *Server) GetInvokeContext() *InvokeContext { - s.mutex.Lock() - defer s.mutex.Unlock() - ctx := *s.invokeCtx return &ctx } func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.invokeCtx != nil { return nil, ErrAlreadyReserved } @@ -226,9 +202,6 @@ func (s *Server) awaitInitCompletion() { } func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.invokeCtx == nil { return "", ErrNotReserved } @@ -248,9 +221,6 @@ func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, err // Release closes the invocation, making server ready for reserve again func (s *Server) Release() error { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.invokeCtx == nil { return ErrNotReserved } @@ -267,9 +237,6 @@ func (s *Server) Release() error { // GetCurrentInvokeID func (s *Server) GetCurrentInvokeID() string { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.invokeCtx == nil { return "" } @@ -350,8 +317,6 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin func (s *Server) SendResponse(invokeID string, resp *interop.StreamableInvokeResponse) error { s.setRuntimeState(runtimeInvokeResponseSent) - s.mutex.Lock() - defer s.mutex.Unlock() runtimeCalledResponse := true return s.sendResponseUnsafe(invokeID, resp.Headers, resp.Payload, resp.Trailers, resp.Request, runtimeCalledResponse) } @@ -372,8 +337,6 @@ func (s *Server) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error func (s *Server) SendErrorResponse(invokeID string, resp *interop.ErrorInvokeResponse) error { log.Debugf("Sending Error Response: %s", resp.FunctionError.Type) s.setRuntimeState(runtimeInvokeError) - s.mutex.Lock() - defer s.mutex.Unlock() additionalHeaders := map[string]string{ directinvoke.ContentTypeHeader: resp.Headers.ContentType, directinvoke.ErrorTypeHeader: string(resp.FunctionError.Type), @@ -501,14 +464,10 @@ func deadlineNsFromTimeoutMs(timeoutMs int64) int64 { } func (s *Server) setInitFailuresChan() { - s.mutex.Lock() - defer s.mutex.Unlock() s.initFailures = make(chan interop.InitFailure) } func (s *Server) getInitFailuresChan() chan interop.InitFailure { - s.mutex.Lock() - defer s.mutex.Unlock() return s.initFailures } @@ -593,14 +552,10 @@ func (s *Server) FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct boo } func (s *Server) setCachedInitErrorResponse(errResp *interop.ErrorInvokeResponse) { - s.mutex.Lock() - defer s.mutex.Unlock() s.cachedInitErrorResponse = errResp } func (s *Server) getCachedInitErrorResponse() *interop.ErrorInvokeResponse { - s.mutex.Lock() - defer s.mutex.Unlock() return s.cachedInitErrorResponse } @@ -613,8 +568,6 @@ func (s *Server) trySendDefaultErrorResponse(resp *interop.ErrorInvokeResponse) } func (s *Server) CurrentToken() *interop.Token { - s.mutex.Lock() - defer s.mutex.Unlock() if s.invokeCtx == nil { return nil } @@ -625,6 +578,9 @@ func (s *Server) CurrentToken() *interop.Token { // Invoke is used by the Runtime Interface Emulator (Rapid Local) // https://github.com/aws/aws-lambda-runtime-interface-emulator func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error { + s.mutex.Lock() + defer s.mutex.Unlock() + resetCtx, resetCancel := context.WithCancel(context.Background()) defer resetCancel()