Skip to content

Commit

Permalink
Fix already reserved panic on concurrent invokes
Browse files Browse the repository at this point in the history
Closes aws#97.
  • Loading branch information
OJFord committed Sep 19, 2024
1 parent ff909ae commit 2023214
Showing 1 changed file with 3 additions and 47 deletions.
50 changes: 3 additions & 47 deletions lambda/rapidcore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,59 +107,35 @@ type Server struct {
var _ interop.Server = (*Server)(nil)

func (s *Server) setRapidPhase(phase rapidPhase) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.rapidPhase = phase
}

func (s *Server) getRapidPhase() rapidPhase {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.rapidPhase
}

func (s *Server) setRuntimeState(state runtimeState) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.runtimeState = state
}

func (s *Server) getRuntimeState() runtimeState {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.runtimeState
}

func (s *Server) SetInvokeTimeout(timeout time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.invokeTimeout = timeout
}

func (s *Server) GetInvokeTimeout() time.Duration {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.invokeTimeout
}

func (s *Server) GetInvokeContext() *InvokeContext {
s.mutex.Lock()
defer s.mutex.Unlock()

ctx := *s.invokeCtx
return &ctx
}

func (s *Server) setNewInvokeContext(invokeID string, traceID, lambdaSegmentID string) (*ReserveResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx != nil {
return nil, ErrAlreadyReserved
}
Expand Down Expand Up @@ -226,9 +202,6 @@ func (s *Server) awaitInitCompletion() {
}

func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return "", ErrNotReserved
}
Expand All @@ -248,9 +221,6 @@ func (s *Server) setReplyStream(w http.ResponseWriter, direct bool) (string, err

// Release closes the invocation, making server ready for reserve again
func (s *Server) Release() error {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ErrNotReserved
}
Expand All @@ -267,9 +237,6 @@ func (s *Server) Release() error {

// GetCurrentInvokeID
func (s *Server) GetCurrentInvokeID() string {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.invokeCtx == nil {
return ""
}
Expand Down Expand Up @@ -350,8 +317,6 @@ func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[strin

func (s *Server) SendResponse(invokeID string, resp *interop.StreamableInvokeResponse) error {
s.setRuntimeState(runtimeInvokeResponseSent)
s.mutex.Lock()
defer s.mutex.Unlock()
runtimeCalledResponse := true
return s.sendResponseUnsafe(invokeID, resp.Headers, resp.Payload, resp.Trailers, resp.Request, runtimeCalledResponse)
}
Expand All @@ -372,8 +337,6 @@ func (s *Server) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error
func (s *Server) SendErrorResponse(invokeID string, resp *interop.ErrorInvokeResponse) error {
log.Debugf("Sending Error Response: %s", resp.FunctionError.Type)
s.setRuntimeState(runtimeInvokeError)
s.mutex.Lock()
defer s.mutex.Unlock()
additionalHeaders := map[string]string{
directinvoke.ContentTypeHeader: resp.Headers.ContentType,
directinvoke.ErrorTypeHeader: string(resp.FunctionError.Type),
Expand Down Expand Up @@ -501,14 +464,10 @@ func deadlineNsFromTimeoutMs(timeoutMs int64) int64 {
}

func (s *Server) setInitFailuresChan() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.initFailures = make(chan interop.InitFailure)
}

func (s *Server) getInitFailuresChan() chan interop.InitFailure {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.initFailures
}

Expand Down Expand Up @@ -593,14 +552,10 @@ func (s *Server) FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct boo
}

func (s *Server) setCachedInitErrorResponse(errResp *interop.ErrorInvokeResponse) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.cachedInitErrorResponse = errResp
}

func (s *Server) getCachedInitErrorResponse() *interop.ErrorInvokeResponse {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.cachedInitErrorResponse
}

Expand All @@ -613,8 +568,6 @@ func (s *Server) trySendDefaultErrorResponse(resp *interop.ErrorInvokeResponse)
}

func (s *Server) CurrentToken() *interop.Token {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.invokeCtx == nil {
return nil
}
Expand All @@ -625,6 +578,9 @@ func (s *Server) CurrentToken() *interop.Token {
// Invoke is used by the Runtime Interface Emulator (Rapid Local)
// https://github.com/aws/aws-lambda-runtime-interface-emulator
func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
s.mutex.Lock()
defer s.mutex.Unlock()

resetCtx, resetCancel := context.WithCancel(context.Background())
defer resetCancel()

Expand Down

0 comments on commit 2023214

Please sign in to comment.