Skip to content

Commit

Permalink
feat: Pull upstream changes 2023/04 (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
valerena authored Apr 27, 2023
1 parent a08886c commit c1cf0c5
Show file tree
Hide file tree
Showing 122 changed files with 6,845 additions and 2,726 deletions.
31 changes: 25 additions & 6 deletions cmd/aws-lambda-rie/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"strings"
"time"

"go.amzn.com/lambda/core/statejson"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapidcore"
"go.amzn.com/lambda/rapidcore/env"

"github.com/google/uuid"

Expand All @@ -27,6 +29,19 @@ type Sandbox interface {
Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error
}

type InteropServer interface {
Init(i *interop.Init, invokeTimeoutMs int64) error
AwaitInitialized() error
FastInvoke(w http.ResponseWriter, i *interop.Invoke, direct bool) error
Reserve(id string, traceID, lambdaSegmentID string) (*rapidcore.ReserveResponse, error)
Reset(reason string, timeoutMs int64) (*statejson.ResetDescription, error)
AwaitRelease() (*statejson.InternalStateDescription, error)
Shutdown(shutdown *interop.Shutdown) *statejson.InternalStateDescription
InternalState() (*statejson.InternalStateDescription, error)
CurrentToken() *interop.Token
Restore(restore *interop.Restore) error
}

var initDone bool

func GetenvWithDefault(key string, defaultValue string) string {
Expand Down Expand Up @@ -57,7 +72,7 @@ func printEndReports(invokeId string, initDuration string, memorySize string, in
invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize)
}

func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox, bs interop.Bootstrap) {
log.Debugf("invoke: -> %s %s %v", r.Method, r.URL, r.Header)
bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
Expand All @@ -80,7 +95,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {

if !initDone {

initStart, initEnd := InitHandler(sandbox, functionVersion, timeout)
initStart, initEnd := InitHandler(sandbox, functionVersion, timeout, bs)

// Calculate InitDuration
initTimeMS := math.Min(float64(initEnd.Sub(initStart).Nanoseconds()),
Expand All @@ -99,7 +114,6 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
TraceID: r.Header.Get("X-Amzn-Trace-Id"),
LambdaSegmentID: r.Header.Get("X-Amzn-Segment-Id"),
Payload: bytes.NewReader(bodyBytes),
CorrelationID: "invokeCorrelationID",
}
fmt.Println("START RequestId: " + invokePayload.ID + " Version: " + functionVersion)

Expand Down Expand Up @@ -166,7 +180,7 @@ func InvokeHandler(w http.ResponseWriter, r *http.Request, sandbox Sandbox) {
w.Write(invokeResp.Body)
}

func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.Time, time.Time) {
func InitHandler(sandbox Sandbox, functionVersion string, timeout int64, bs interop.Bootstrap) (time.Time, time.Time) {
additionalFunctionEnvironmentVariables := map[string]string{}

// Add default Env Vars if they were not defined. This is a required otherwise 1p Python2.7, Python3.6, and
Expand All @@ -189,15 +203,20 @@ func InitHandler(sandbox Sandbox, functionVersion string, timeout int64) (time.T
// pass to rapid
sandbox.Init(&interop.Init{
Handler: GetenvWithDefault("AWS_LAMBDA_FUNCTION_HANDLER", os.Getenv("_HANDLER")),
CorrelationID: "initCorrelationID",
AwsKey: os.Getenv("AWS_ACCESS_KEY_ID"),
AwsSecret: os.Getenv("AWS_SECRET_ACCESS_KEY"),
AwsSession: os.Getenv("AWS_SESSION_TOKEN"),
XRayDaemonAddress: "0.0.0.0:0", // TODO
FunctionName: GetenvWithDefault("AWS_LAMBDA_FUNCTION_NAME", "test_function"),
FunctionVersion: functionVersion,

RuntimeInfo: interop.RuntimeInfo{
ImageJSON: "{}",
Arn: "",
Version: ""},
CustomerEnvironmentVariables: additionalFunctionEnvironmentVariables,
SandboxType: interop.SandboxClassic,
Bootstrap: bs,
EnvironmentVariables: env.NewEnvironment(),
}, timeout*1000)
initEnd := time.Now()
return initStart, initEnd
Expand Down
6 changes: 4 additions & 2 deletions cmd/aws-lambda-rie/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"net/http"

log "github.com/sirupsen/logrus"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapidcore"
)

func startHTTPServer(ipport string, sandbox Sandbox) {
func startHTTPServer(ipport string, sandbox *rapidcore.SandboxBuilder, bs interop.Bootstrap) {
srv := &http.Server{
Addr: ipport,
}

// Pass a channel
http.HandleFunc("/2015-03-31/functions/function/invocations", func(w http.ResponseWriter, r *http.Request) {
InvokeHandler(w, r, sandbox)
InvokeHandler(w, r, sandbox.LambdaInvokeAPI(), bs)
})

// go routine (main thread waits)
Expand Down
51 changes: 44 additions & 7 deletions cmd/aws-lambda-rie/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"fmt"
"net"
"os"
"runtime/debug"

Expand All @@ -21,20 +22,49 @@ const (
)

type options struct {
LogLevel string `long:"log-level" default:"info" description:"log level"`
LogLevel string `long:"log-level" description:"The level of AWS Lambda Runtime Interface Emulator logs to display. Can also be set by the environment variable 'LOG_LEVEL'. Defaults to the value 'info'."`
InitCachingEnabled bool `long:"enable-init-caching" description:"Enable support for Init Caching"`
// Do not have a default value so we do not need to keep it in sync with the default value in lambda/rapidcore/sandbox_builder.go
RuntimeAPIAddress string `long:"runtime-api-address" description:"The address of the AWS Lambda Runtime API to communicate with the Lambda execution environment."`
RuntimeInterfaceEmulatorAddress string `long:"runtime-interface-emulator-address" default:"0.0.0.0:8080" description:"The address for the AWS Lambda Runtime Interface Emulator to accept HTTP request upon."`
}

func main() {
// More frequent GC reduces the tail latencies, equivalent to export GOGC=33
debug.SetGCPercent(33)

opts, args := getCLIArgs()
rapidcore.SetLogLevel(opts.LogLevel)

logLevel := "info"

// If you specify an option by using a parameter on the CLI command line, it overrides any value from either the corresponding environment variable.
//
// https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
if opts.LogLevel != "" {
logLevel = opts.LogLevel
} else if envLogLevel, envLogLevelSet := os.LookupEnv("LOG_LEVEL"); envLogLevelSet {
logLevel = envLogLevel
}

rapidcore.SetLogLevel(logLevel)

if opts.RuntimeAPIAddress != "" {
_, _, err := net.SplitHostPort(opts.RuntimeAPIAddress)

if err != nil {
log.WithError(err).Fatalf("The command line value for \"--runtime-api-address\" is not a valid network address %q.", opts.RuntimeAPIAddress)
}
}

_, _, err := net.SplitHostPort(opts.RuntimeInterfaceEmulatorAddress)

if err != nil {
log.WithError(err).Fatalf("The command line value for \"--runtime-interface-emulator-address\" is not a valid network address %q.", opts.RuntimeInterfaceEmulatorAddress)
}

bootstrap, handler := getBootstrap(args, opts)
sandbox := rapidcore.
NewSandboxBuilder(bootstrap).
NewSandboxBuilder().
AddShutdownFunc(context.CancelFunc(func() { os.Exit(0) })).
SetExtensionsFlag(true).
SetInitCachingFlag(opts.InitCachingEnabled)
Expand All @@ -43,10 +73,17 @@ func main() {
sandbox.SetHandler(handler)
}

go sandbox.Create()
if opts.RuntimeAPIAddress != "" {
sandbox.SetRuntimeAPIAddress(opts.RuntimeAPIAddress)
}

sandboxContext, internalStateFn := sandbox.Create()
// Since we have not specified a custom interop server for standalone, we can
// directly reference the default interop server, which is a concrete type
sandbox.DefaultInteropServer().SetSandboxContext(sandboxContext)
sandbox.DefaultInteropServer().SetInternalStateGetter(internalStateFn)

testAPIipport := "0.0.0.0:8080"
startHTTPServer(testAPIipport, sandbox)
startHTTPServer(opts.RuntimeInterfaceEmulatorAddress, sandbox, bootstrap)
}

func getCLIArgs() (options, []string) {
Expand Down Expand Up @@ -112,5 +149,5 @@ func getBootstrap(args []string, opts options) (*rapidcore.Bootstrap, string) {
log.Panic("insufficient arguments: bootstrap not provided")
}

return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir), handler
return rapidcore.NewBootstrapSingleCmd(bootstrapLookupCmd, currentWorkingDir, ""), handler
}
75 changes: 18 additions & 57 deletions lambda/agents/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,38 @@
package agents

import (
"fmt"
"io"
"io/ioutil"
"os/exec"
"os"
"path"
"syscall"
"path/filepath"

log "github.com/sirupsen/logrus"
)

// AgentProcess is the common interface exposed by both internal and external agent processes
type AgentProcess interface {
Name() string
}

// ExternalAgentProcess represents an external agent process
type ExternalAgentProcess struct {
cmd *exec.Cmd
}

// NewExternalAgentProcess returns a new external agent process
func NewExternalAgentProcess(path string, env []string, stdoutWriter io.Writer, stderrWriter io.Writer) ExternalAgentProcess {
command := exec.Command(path)
command.Env = env

command.Stdout = NewNewlineSplitWriter(stdoutWriter)
command.Stderr = NewNewlineSplitWriter(stderrWriter)
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

return ExternalAgentProcess{
cmd: command,
}
}

// Name returns the name of the agent
// For external agents is the executable name
func (a *ExternalAgentProcess) Name() string {
return path.Base(a.cmd.Path)
}

func (a *ExternalAgentProcess) Pid() int {
return a.cmd.Process.Pid
}

// Start starts an external agent process
func (a *ExternalAgentProcess) Start() error {
return a.cmd.Start()
}

// Wait waits for the external agent process to exit
func (a *ExternalAgentProcess) Wait() error {
return a.cmd.Wait()
}

// String is used to print values passed as an operand to any format that accepts a string or to an unformatted printer such as Print.
func (a *ExternalAgentProcess) String() string {
return fmt.Sprintf("%s (%s)", a.Name(), a.cmd.Path)
}

// ListExternalAgentPaths return a list of external agents found in a given directory
func ListExternalAgentPaths(root string) []string {
func ListExternalAgentPaths(dir string, root string) []string {
var agentPaths []string
files, err := ioutil.ReadDir(root)
if !isCanonical(dir) || !isCanonical(root) {
log.Warningf("Agents base paths are not absolute and in canonical form: %s, %s", dir, root)
return agentPaths
}
fullDir := path.Join(root, dir)
files, err := os.ReadDir(fullDir)
if err != nil {
log.WithError(err).Warning("Cannot list external agents")
return agentPaths
}
for _, file := range files {
if !file.IsDir() {
agentPaths = append(agentPaths, path.Join(root, file.Name()))
// The returned path is absolute wrt to `root`. This allows
// to exec the agents in their own mount namespace
p := path.Join("/", dir, file.Name())
agentPaths = append(agentPaths, p)
}
}
return agentPaths
}

func isCanonical(path string) bool {
absPath, err := filepath.Abs(path)
return err == nil && absPath == path
}
Loading

0 comments on commit c1cf0c5

Please sign in to comment.