diff --git a/cmd/aws-lambda-rie/util.go b/cmd/aws-lambda-rie/util.go index 1d539ec..51563e6 100644 --- a/cmd/aws-lambda-rie/util.go +++ b/cmd/aws-lambda-rie/util.go @@ -5,7 +5,8 @@ package main import ( "fmt" - "net/http") + "net/http" +) type ErrorType int diff --git a/lambda/logging/internal_log_test.go b/lambda/logging/internal_log_test.go index 3ec537f..6ca3992 100644 --- a/lambda/logging/internal_log_test.go +++ b/lambda/logging/internal_log_test.go @@ -164,4 +164,3 @@ func BenchmarkLogrusDebugWithFieldLogLevelDisabledInternalFormatter(b *testing.B l.WithField("field", "value").Debug(1, "two", true) } } - diff --git a/lambda/rapi/handler/runtimelogs_stub.go b/lambda/rapi/handler/runtimelogs_stub.go index f540e9b..40cf5ec 100644 --- a/lambda/rapi/handler/runtimelogs_stub.go +++ b/lambda/rapi/handler/runtimelogs_stub.go @@ -4,7 +4,17 @@ package handler import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io/ioutil" "net/http" + "net/url" + "os" + "strings" + "sync" + "time" log "github.com/sirupsen/logrus" "go.amzn.com/lambda/rapi/model" @@ -16,6 +26,24 @@ const ( telemetryAPIDisabledErrorType = "Telemetry.NotSupported" ) +type runtimeTelemetryBuffering struct { + MaxBytes int64 `json:"maxBytes"` + MaxItems int `json:"maxItems"` + TimeoutMs int64 `json:"timeoutMs"` +} + +type runtimeTelemetryDestination struct { + URI string `json:"URI"` + Protocol string `json:"protocol"` +} + +type runtimeTelemetryRequest struct { + Buffering runtimeTelemetryBuffering `json:"buffering"` + Destination runtimeTelemetryDestination `json:"destination"` + Types []string `json:"types"` + SchemaVersion string `json:"schemaVersion"` +} + type runtimeLogsStubAPIHandler struct{} func (h *runtimeLogsStubAPIHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { @@ -34,9 +62,36 @@ func NewRuntimeLogsAPIStubHandler() http.Handler { return &runtimeLogsStubAPIHandler{} } -type runtimeTelemetryAPIStubHandler struct{} +type runtimeTelemetryAPIStubHandler struct { + destinations []string + mu sync.Mutex +} func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + var runtimeReq runtimeTelemetryRequest + body, err := ioutil.ReadAll(request.Body) + if err != nil { + log.WithError(err).Warn("Error while reading request body") + http.Error(writer, err.Error(), http.StatusInternalServerError) + } + err = json.Unmarshal(body, &runtimeReq) + if err != nil { + log.WithError(err).Warn("Error while unmarshaling request") + http.Error(writer, err.Error(), http.StatusInternalServerError) + } + if len(runtimeReq.Destination.URI) > 0 && runtimeReq.Destination.Protocol == "HTTP" { + u, err := url.Parse(runtimeReq.Destination.URI) + if err != nil { + log.WithError(err).Warn("Error while parsing destination URL") + http.Error(writer, err.Error(), http.StatusInternalServerError) + } + if sep := strings.IndexRune(u.Host, ':'); sep != -1 && u.Host[:sep] == "sandbox" { + u.Host = "localhost" + u.Host[sep:] + } + h.mu.Lock() + h.destinations = append(h.destinations, u.String()) + h.mu.Unlock() + } if err := rendering.RenderJSON(http.StatusAccepted, writer, request, &model.ErrorResponse{ ErrorType: telemetryAPIDisabledErrorType, ErrorMessage: "Telemetry API is not supported", @@ -46,8 +101,55 @@ func (h *runtimeTelemetryAPIStubHandler) ServeHTTP(writer http.ResponseWriter, r } } +type logMessage struct { + Time string `json:"time"` + Type string `json:"type"` + Record string `json:"record"` +} + // NewRuntimeTelemetryAPIStubHandler returns a new instance of http handler // for serving /runtime/logs when a telemetry service implementation is absent func NewRuntimeTelemetryAPIStubHandler() http.Handler { - return &runtimeTelemetryAPIStubHandler{} + handler := runtimeTelemetryAPIStubHandler{} + originalStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + os.Stderr = w + scanner := bufio.NewScanner(r) + scanner.Split(bufio.ScanLines) + go func() { + for { + if len(handler.destinations) > 0 { + var msgs []logMessage + for scanner.Scan() && len(msgs) < 10 { + line := scanner.Text() + originalStdout.WriteString(fmt.Sprintf("%s\n", line)) + msgs = append(msgs, logMessage{ + Time: time.Now().Format("2006-01-02T15:04:05.999Z"), + Type: "function", + Record: line, + }) + } + data, err := json.Marshal(msgs) + if err != nil { + originalStdout.WriteString(fmt.Sprintf("%s\n", err)) + } + bodyReader := bytes.NewReader(data) + handler.mu.Lock() + destinations := handler.destinations + handler.mu.Unlock() + for _, dest := range destinations { + resp, err := http.Post(dest, "application/json", bodyReader) + if err != nil { + originalStdout.WriteString(fmt.Sprintf("%s\n", err)) + } + if resp.StatusCode > 300 { + originalStdout.WriteString(fmt.Sprintf("failed to send logs to destination %q: status %d", dest, resp.StatusCode)) + } + } + } + time.Sleep(5 * time.Second) + } + }() + return &handler } diff --git a/lambda/rapid/handlers.go b/lambda/rapid/handlers.go index f379c4c..b827ff5 100644 --- a/lambda/rapid/handlers.go +++ b/lambda/rapid/handlers.go @@ -41,7 +41,7 @@ const ( const ( // Same value as defined in LambdaSandbox minus 1. - maxExtensionNamesLength = 127 + maxExtensionNamesLength = 127 standaloneShutdownReason = "spindown" )