From ddc62d9501165b06504058fc96a178d21d1dc2f5 Mon Sep 17 00:00:00 2001 From: Dominik Schubert Date: Tue, 13 Feb 2024 17:04:11 +0100 Subject: [PATCH] Fix over-sized response handling (#33) --- cmd/localstack/custom_interop.go | 7 +++++-- cmd/localstack/main.go | 11 +++++++++++ debugging/Makefile | 2 +- lambda/core/directinvoke/directinvoke.go | 5 +++-- lambda/interop/model.go | 7 ++++--- lambda/rapi/rendering/rendering.go | 3 ++- 6 files changed, 26 insertions(+), 9 deletions(-) diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index 2bd3541..1941668 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -6,6 +6,7 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" "github.com/go-chi/chi" log "github.com/sirupsen/logrus" @@ -117,8 +118,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto timeout := int(server.delegate.GetInvokeTimeout().Seconds()) isErr := false if err != nil { - switch err { - case rapidcore.ErrInvokeTimeout: + switch { + case errors.Is(err, rapidcore.ErrInvokeTimeout): log.Debugf("Got invoke timeout") isErr = true errorResponse := ErrorResponse{ @@ -137,6 +138,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto if err != nil { log.Fatalln("unable to write to response") } + case errors.Is(err, rapidcore.ErrInvokeDoneFailed): + // we can actually just continue here, error message is sent below default: log.Fatalln(err) } diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index e6eb026..e936d78 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -5,6 +5,7 @@ package main import ( "context" log "github.com/sirupsen/logrus" + "go.amzn.com/lambda/interop" "go.amzn.com/lambda/rapidcore" "os" "runtime/debug" @@ -28,6 +29,7 @@ type LsOpts struct { EdgePort string EnableXRayTelemetry string PostInvokeWaitMS string + MaxPayloadSize string } func GetEnvOrDie(env string) string { @@ -50,6 +52,7 @@ func InitLsOpts() *LsOpts { User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"), InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "warn"), EdgePort: GetenvWithDefault("EDGE_PORT", "4566"), + MaxPayloadSize: GetenvWithDefault("LOCALSTACK_MAX_PAYLOAD_SIZE", "6291556"), // optional or empty CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), @@ -77,6 +80,7 @@ func UnsetLsEnvs() { "LOCALSTACK_INIT_LOG_LEVEL", "LOCALSTACK_POST_INVOKE_WAIT_MS", "LOCALSTACK_FUNCTION_ACCOUNT_ID", + "LOCALSTACK_MAX_PAYLOAD_SIZE", // Docker container ID "HOSTNAME", @@ -128,6 +132,13 @@ func main() { log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL") } + // patch MaxPayloadSize + payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize) + if err != nil { + log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE") + } + interop.MaxPayloadSize = payloadSize + // enable dns server dnsServerContext, stopDnsServer := context.WithCancel(context.Background()) go RunDNSRewriter(lsOpts, dnsServerContext) diff --git a/debugging/Makefile b/debugging/Makefile index 9bd3e35..fe3a68f 100644 --- a/debugging/Makefile +++ b/debugging/Makefile @@ -1,5 +1,5 @@ # Golang EOL overview: https://endoflife.date/go -DOCKER_GOLANG_IMAGE ?= golang:1.19 +DOCKER_GOLANG_IMAGE ?= golang:1.20-bullseye # On ARM hosts, use: make ARCH=arm64 build-init # Check host architecture: uname -m diff --git a/lambda/core/directinvoke/directinvoke.go b/lambda/core/directinvoke/directinvoke.go index 3510132..396bd39 100644 --- a/lambda/core/directinvoke/directinvoke.go +++ b/lambda/core/directinvoke/directinvoke.go @@ -1,5 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +// LOCALSTACK CHANGES 2024-02-13: casting of MaxPayloadSize package directinvoke @@ -51,7 +52,7 @@ var ResetReasonMap = map[string]fatalerror.ErrorType{ "timeout": fatalerror.SandboxTimeout, } -var MaxDirectResponseSize int64 = interop.MaxPayloadSize // this is intentionally not a constant so we can configure it via CLI +var MaxDirectResponseSize = int64(interop.MaxPayloadSize) // this is intentionally not a constant so we can configure it via CLI var ResponseBandwidthRate int64 = interop.ResponseBandwidthRate var ResponseBandwidthBurstSize int64 = interop.ResponseBandwidthBurstSize @@ -104,7 +105,7 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T now := metering.Monotime() - MaxDirectResponseSize = interop.MaxPayloadSize + MaxDirectResponseSize = int64(interop.MaxPayloadSize) if maxPayloadSize := r.Header.Get(MaxPayloadSizeHeader); maxPayloadSize != "" { if n, err := strconv.ParseInt(maxPayloadSize, 10, 64); err == nil && n >= -1 { MaxDirectResponseSize = n diff --git a/lambda/interop/model.go b/lambda/interop/model.go index a4bdbf4..ee7bb2a 100644 --- a/lambda/interop/model.go +++ b/lambda/interop/model.go @@ -1,5 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +// LOCALSTACK CHANGES 2024-02-13: adjust error message for ErrorResponseTooLarge to be in parity with what AWS returns; make MaxPayloadSize adjustable package interop @@ -18,10 +19,10 @@ import ( log "github.com/sirupsen/logrus" ) +var MaxPayloadSize int = 6*1024*1024 + 100 // 6 MiB + 100 bytes + // MaxPayloadSize max event body size declared as LAMBDA_EVENT_BODY_SIZE const ( - MaxPayloadSize = 6*1024*1024 + 100 // 6 MiB + 100 bytes - ResponseBandwidthRate = 2 * 1024 * 1024 // default average rate of 2 MiB/s ResponseBandwidthBurstSize = 6 * 1024 * 1024 // default burst size of 6 MiB @@ -355,7 +356,7 @@ type ErrorResponseTooLargeDI struct { // ErrorResponseTooLarge is returned when response provided by Runtime does not fit into shared memory buffer func (s *ErrorResponseTooLarge) Error() string { - return fmt.Sprintf("Response payload size (%d bytes) exceeded maximum allowed payload size (%d bytes).", s.ResponseSize, s.MaxResponseSize) + return fmt.Sprintf("Response payload size exceeded maximum allowed payload size (%d bytes).", s.MaxResponseSize) } // AsErrorResponse generates ErrorInvokeResponse from ErrorResponseTooLarge diff --git a/lambda/rapi/rendering/rendering.go b/lambda/rapi/rendering/rendering.go index 9a9d77b..08de1e3 100644 --- a/lambda/rapi/rendering/rendering.go +++ b/lambda/rapi/rendering/rendering.go @@ -1,5 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +// LOCALSTACK CHANGES 2024-02-13: casting of MaxPayloadSize package rendering @@ -174,7 +175,7 @@ func (s *InvokeRenderer) bufferInvokeRequest() error { defer s.requestMutex.Unlock() var err error = nil if s.requestBuffer.Len() == 0 { - reader := io.LimitReader(s.invoke.Payload, interop.MaxPayloadSize) + reader := io.LimitReader(s.invoke.Payload, int64(interop.MaxPayloadSize)) start := time.Now() _, err = s.requestBuffer.ReadFrom(reader) s.metrics = InvokeRendererMetrics{