From 6e9761d8c44aa739198046c0b2a6b83e3b4dfddf Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 16 Jan 2026 17:06:56 +0100 Subject: [PATCH] feat: Support Managed Instances entrypoint --- cmd/localstack/main.go | 208 ++++++----------- cmd/localstack/run_managed.go | 146 ++++++++++++ cmd/localstack/run_standard.go | 139 ++++++++++++ cmd/localstack/server.go | 211 ++++++++++++++++++ .../aws-lambda-rie/entrypoint.go | 109 +++++++++ 5 files changed, 673 insertions(+), 140 deletions(-) create mode 100644 cmd/localstack/run_managed.go create mode 100644 cmd/localstack/run_standard.go create mode 100644 cmd/localstack/server.go create mode 100644 internal/lambda-managed-instances/aws-lambda-rie/entrypoint.go diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 9bd5b93..93af404 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -1,17 +1,15 @@ -// main entrypoint of init -// initial structure based upon /cmd/aws-lambda-rie/main.go package main import ( - "context" + "io" + "log/slog" "os" "runtime/debug" - "strconv" "strings" - "time" - "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" - "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore" + mlogging "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/logging" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapidcore/env" log "github.com/sirupsen/logrus" ) @@ -105,160 +103,90 @@ func main() { UnsetLsEnvs() // set up logging following the Logrus logging levels: https://github.com/sirupsen/logrus#level-logging + configureLogging(lsOpts.InitLogLevel) + + // Download code archives + if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil { + log.Fatal("Failed to download code archives: " + err.Error()) + } + + if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil { + log.Warnln("Could not change file mode of code directories:", err) + } + + // Check if running in managed mode + if _, ok := os.LookupEnv(env.AWS_LAMBDA_MAX_CONCURRENCY); ok { + runManaged(lsOpts) + return + } + + runStandard(lsOpts) +} + +func doInitDaemon(addr, port string, enable bool, lvl string) *Daemon { + endpoint := "http://" + addr + ":" + port + xrayConfig := initConfig(endpoint, getXRayLogLevel(lvl)) + d := initDaemon(xrayConfig, enable) + runDaemon(d) + return d +} + +func configureManagedLogger(logLevel string) { + level := slogLevelFromString(logLevel) + slog.SetDefault(mlogging.CreateNewLogger(level, io.Writer(os.Stderr))) +} + +func configureStandardLogger(logLevel string) { + log.SetOutput(os.Stderr) +} + +func configureLogging(logLevel string) { log.SetReportCaller(true) - // https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html - xRayLogLevel := "info" - switch lsOpts.InitLogLevel { + switch logLevel { case "trace": log.SetFormatter(&log.JSONFormatter{}) log.SetLevel(log.TraceLevel) - xRayLogLevel = "debug" case "debug": log.SetLevel(log.DebugLevel) - xRayLogLevel = "debug" case "info": log.SetLevel(log.InfoLevel) case "warn": log.SetLevel(log.WarnLevel) - xRayLogLevel = "warn" case "error": log.SetLevel(log.ErrorLevel) - xRayLogLevel = "error" case "fatal": log.SetLevel(log.FatalLevel) - xRayLogLevel = "error" case "panic": log.SetLevel(log.PanicLevel) - xRayLogLevel = "error" default: log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL") } +} - // patch MaxPayloadSize - payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize) - if err != nil { - log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE") - } - interop.MaxPayloadSize = payloadSize - - // download code archive if env variable is set - if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil { - log.Fatal("Failed to download code archives: " + err.Error()) - } - - if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil { - log.Warnln("Could not change file mode of code directories:", err) - } - - // parse CLI args - bootstrap, handler := getBootstrap(os.Args) - - // Switch to non-root user and drop root privileges - if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" { - uid := 993 - gid := 990 - AddUser(lsOpts.User, uid, gid) - if err := os.Chown("/tmp", uid, gid); err != nil { - log.Warnln("Could not change owner of directory /tmp:", err) - } - UserLogger().Debugln("Process running as root user.") - err := DropPrivileges(lsOpts.User) - if err != nil { - log.Warnln("Could not drop root privileges.", err) - } else { - UserLogger().Debugln("Process running as non-root user.") - } - } - - // file watcher for hot-reloading - fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background()) - - logCollector := NewLogCollector() - localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector) - tracer := NewLocalStackTracer() - - // build sandbox - sandbox := rapidcore. - NewSandboxBuilder(). - //SetTracer(tracer). - AddShutdownFunc(func() { - log.Debugln("Stopping file watcher") - cancelFileWatcher() - }). - SetExtensionsFlag(true). - SetInitCachingFlag(true). - SetLogsEgressAPI(localStackLogsEgressApi). - SetTracer(tracer) - - // Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable. - // We need to ensure the runtime server is up before the INIT phase, - // but this envar is only set after the InitHandler is called. - runtimeAPIAddress := "127.0.0.1:9001" - sandbox.SetRuntimeAPIAddress(runtimeAPIAddress) - - // xray daemon - endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort - xrayConfig := initConfig(endpoint, xRayLogLevel) - d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1") - sandbox.AddShutdownFunc(func() { - log.Debugln("Shutting down xray daemon") - d.stop() - log.Debugln("Flushing segments in xray daemon") - d.close() - }) - runDaemon(d) // async - - defaultInterop := sandbox.DefaultInteropServer() - interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) - sandbox.SetInteropServer(interopServer) - if len(handler) > 0 { - sandbox.SetHandler(handler) - } - exitChan := make(chan struct{}) - sandbox.AddShutdownFunc(func() { - exitChan <- struct{}{} - }) - - // initialize all flows and start runtime API - sandboxContext, internalStateFn := sandbox.Create() - // Populate our custom interop server - interopServer.SetSandboxContext(sandboxContext) - interopServer.SetInternalStateGetter(internalStateFn) - - // get timeout - invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing - invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) - if err != nil { - log.Fatalln(err) - } - go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy) - - log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddress) - // Fixes https://github.com/localstack/localstack/issues/12680 - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - if err := waitForRuntimeAPI(ctx, runtimeAPIAddress); err != nil { - log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddress, err.Error()) - } - cancel() - - // start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the - // notification channels and status fields are properly initialized before `AwaitInitialized` - log.Debugln("Starting runtime init.") - InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init - - log.Debugln("Awaiting initialization of runtime init.") - if err := interopServer.delegate.AwaitInitialized(); err != nil { - // Error cases: ErrInitDoneFailed or ErrInitResetReceived - log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.") - // NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the - // callback SendInitErrorResponse because it contains the correct error response payload. - return +func slogLevelFromString(logLevel string) slog.Level { + switch logLevel { + case "trace", "debug": + return slog.LevelDebug + case "info": + return slog.LevelInfo + case "warn": + return slog.LevelWarn + case "error", "fatal", "panic": + return slog.LevelError + default: + return slog.LevelInfo } +} - log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.") - if err := interopServer.localStackAdapter.SendStatus(Ready, []byte{}); err != nil { - log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.") +func getXRayLogLevel(initLogLevel string) string { + switch initLogLevel { + case "trace", "debug": + return "debug" + case "warn": + return "warn" + case "error", "fatal", "panic": + return "error" + default: + return "info" } - - <-exitChan } diff --git a/cmd/localstack/run_managed.go b/cmd/localstack/run_managed.go new file mode 100644 index 0000000..4b15ae9 --- /dev/null +++ b/cmd/localstack/run_managed.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + rie "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/supervisor/local" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/utils" + log "github.com/sirupsen/logrus" +) + +func runManaged(lsOpts *LsOpts) { + configureManagedLogger(lsOpts.InitLogLevel) + + // Initialize X-Ray daemon + d := doInitDaemon( + lsOpts.LocalstackIP, + lsOpts.EdgePort, + lsOpts.EnableXRayTelemetry == "1", + lsOpts.InitLogLevel, + ) + + defer func() { + log.Debugln("Shutting down xray daemon") + d.stop() + log.Debugln("Flushing segments in xray daemon") + d.close() + }() + + var credential *syscall.Credential + if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" { + uid := 993 + gid := 990 + AddUser(lsOpts.User, uid, gid) + if err := os.Chown("/tmp", uid, gid); err != nil { + log.Warnln("Could not change owner of directory /tmp:", err) + } + + credential = &syscall.Credential{ + Uid: uint32(uid), + Gid: uint32(gid), + } + + UserLogger().Debugln("Configured runtime to run as non-root user:", lsOpts.User) + } + + adapter := LocalStackAdapter{ + UpstreamEndpoint: lsOpts.RuntimeEndpoint, + RuntimeId: lsOpts.RuntimeId, + } + + rieAddr := fmt.Sprintf("0.0.0.0:%s", lsOpts.InteropPort) + rapiAddr := "127.0.0.1:9001" + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + logCollector := NewLogCollector() + + var supervOpts []local.ProcessSupervisorOption + supervOpts = append(supervOpts, local.WithLowerPriorities(false)) + if credential != nil { + supervOpts = append(supervOpts, local.WithProcessCredential(credential)) + } + supv := local.NewProcessSupervisor(supervOpts...) + + fileUtil := utils.NewFileUtil() + + invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") + invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) + if err != nil { + log.Fatalln(err) + } + + raptorApp, err := rie.Run( + rapiAddr, supv, fileUtil, logCollector, + ) + if err != nil { + log.Errorf("failed with error: %w", err) + return + } + + initReq, err := rie.GetInitRequestMessage(fileUtil, os.Args) + if err != nil { + log.Errorf("could not build initialization parameters: %w", err) + return + } + + // HACK(gregfurman): expects the account to be set via the AWS_ACCOUNT_ID env var which is undocumented + initReq.AccountID = lsOpts.AccountId + initReq.FunctionARN = fmt.Sprintf("arn:aws:lambda:%s:%s:function:%s:%s", initReq.AwsRegion, initReq.AccountID, initReq.TaskName, initReq.FunctionVersion) + // Convert seconds to time.Duration (invokeTimeoutSeconds is in seconds, need to convert to Duration) + initReq.InvokeTimeout = model.DurationMS(time.Duration(invokeTimeoutSeconds) * time.Second) + + runtimeAPIAddr := raptorApp.RuntimeAPIAddrPort() + + rieHandler, err := NewInvokeHandler( + *lsOpts, initReq, raptorApp, logCollector, + ) + if err != nil { + log.Fatal("creating RIE handler error:", err) + } + + if err := rieHandler.Init(); err != nil { + log.Warn("INIT failed", "err", err) + } + + server, err := rie.StartServer(raptorApp, rieHandler, rieAddr, sigChan) + if err != nil { + log.Fatal("Proxy ListenAndServe error:", err) + } + + // go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy) + + log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddr.String()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := waitForRuntimeAPI(ctx, runtimeAPIAddr.String()); err != nil { + log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddr.String(), err.Error()) + } + cancel() + + log.Debugln("Completed initialization of runtime. Sending status ready to LocalStack.") + if err := adapter.SendStatus(Ready, []byte{}); err != nil { + log.Fatalln("Failed to send status ready to LocalStack", err, ". Exiting.") + } + + select { + case <-server.Done(): + if err := server.Err(); err != nil { + log.Warn("rie server stopped", "err", err) + os.Exit(1) + } + case <-raptorApp.Done(): + if err := raptorApp.Err(); err != nil { + log.Errorln("Runtime error:", err) + } + } + +} diff --git a/cmd/localstack/run_standard.go b/cmd/localstack/run_standard.go new file mode 100644 index 0000000..c5b18e1 --- /dev/null +++ b/cmd/localstack/run_standard.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "os" + "strconv" + "time" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore" + log "github.com/sirupsen/logrus" +) + +func runStandard(lsOpts *LsOpts) { + configureStandardLogger(lsOpts.InitLogLevel) + + // xRayLogLevel := getXRayLogLevel(lsOpts.InitLogLevel) + + payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize) + if err != nil { + log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE") + } + interop.MaxPayloadSize = payloadSize + + // parse CLI args + bootstrap, handler := getBootstrap(os.Args) + + // Switch to non-root user and drop root privileges + if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" { + uid := 993 + gid := 990 + AddUser(lsOpts.User, uid, gid) + if err := os.Chown("/tmp", uid, gid); err != nil { + log.Warnln("Could not change owner of directory /tmp:", err) + } + UserLogger().Debugln("Process running as root user.") + err := DropPrivileges(lsOpts.User) + if err != nil { + log.Warnln("Could not drop root privileges.", err) + } else { + UserLogger().Debugln("Process running as non-root user.") + } + } + + // file watcher for hot-reloading + fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background()) + + logCollector := NewLogCollector() + localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector) + tracer := NewLocalStackTracer() + + // build sandbox + sandbox := rapidcore. + NewSandboxBuilder(). + //SetTracer(tracer). + AddShutdownFunc(func() { + log.Debugln("Stopping file watcher") + cancelFileWatcher() + }). + SetExtensionsFlag(true). + SetInitCachingFlag(true). + SetLogsEgressAPI(localStackLogsEgressApi). + SetTracer(tracer) + + // Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable. + // We need to ensure the runtime server is up before the INIT phase, + // but this envar is only set after the InitHandler is called. + runtimeAPIAddress := "127.0.0.1:9001" + sandbox.SetRuntimeAPIAddress(runtimeAPIAddress) + + // Initialize X-Ray daemon + d := doInitDaemon( + lsOpts.LocalstackIP, + lsOpts.EdgePort, + lsOpts.EnableXRayTelemetry == "1", + lsOpts.InitLogLevel, + ) + + sandbox.AddShutdownFunc(func() { + log.Debugln("Shutting down xray daemon") + d.stop() + log.Debugln("Flushing segments in xray daemon") + d.close() + }) + + defaultInterop := sandbox.DefaultInteropServer() + interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) + sandbox.SetInteropServer(interopServer) + if len(handler) > 0 { + sandbox.SetHandler(handler) + } + exitChan := make(chan struct{}) + sandbox.AddShutdownFunc(func() { + exitChan <- struct{}{} + }) + + // initialize all flows and start runtime API + sandboxContext, internalStateFn := sandbox.Create() + // Populate our custom interop server + interopServer.SetSandboxContext(sandboxContext) + interopServer.SetInternalStateGetter(internalStateFn) + + // get timeout + invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing + invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv) + if err != nil { + log.Fatalln(err) + } + go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy) + + log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddress) + // Fixes https://github.com/localstack/localstack/issues/12680 + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := waitForRuntimeAPI(ctx, runtimeAPIAddress); err != nil { + log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddress, err.Error()) + } + cancel() + + // start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the + // notification channels and status fields are properly initialized before `AwaitInitialized` + log.Debugln("Starting runtime init.") + InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init + + log.Debugln("Awaiting initialization of runtime init.") + if err := interopServer.delegate.AwaitInitialized(); err != nil { + // Error cases: ErrInitDoneFailed or ErrInitResetReceived + log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.") + // NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the + // callback SendInitErrorResponse because it contains the correct error response payload. + return + } + + log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.") + if err := interopServer.localStackAdapter.SendStatus(Ready, []byte{}); err != nil { + log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.") + } + + <-exitChan +} diff --git a/cmd/localstack/server.go b/cmd/localstack/server.go new file mode 100644 index 0000000..16dee89 --- /dev/null +++ b/cmd/localstack/server.go @@ -0,0 +1,211 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "sync" + "time" + + rie "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/invoke" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/logging" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapid" + rapidmodel "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapid/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/raptor" + + log "github.com/sirupsen/logrus" +) + +type adaptedInvokeRequest struct { + interop.InvokeRequest + + inReq InvokeRequest + maxPayloadSize int64 +} + +func NewAdaptedInvokeRequest(r *http.Request, w http.ResponseWriter, invokeReq InvokeRequest, maxPayloadSize int64) *adaptedInvokeRequest { + // HACK(gregfurman): We use the original request to construct a new interop.InvokeRequest via struct embedding. We then compose + // a new adaptedInvokeRequest type (that implements interop.InvokeRequest iface) that is adapted to suit our LocalStack internals. + + dummyReq := r.Clone(context.Background()) + dummyReq.Body = io.NopCloser(strings.NewReader(invokeReq.Payload)) + + internalReq := rie.NewRieInvokeRequest(dummyReq, w) + + return &adaptedInvokeRequest{ + InvokeRequest: internalReq, + inReq: invokeReq, + maxPayloadSize: int64(maxPayloadSize), + } +} + +func (r *adaptedInvokeRequest) MaxPayloadSize() int64 { + return r.maxPayloadSize +} + +func (r *adaptedInvokeRequest) InvokeId() string { + return r.inReq.InvokeId +} + +func (r *adaptedInvokeRequest) TraceId() string { + return r.inReq.TraceId +} + +//-------------------------------------------------------------------------------------------- + +type InvokeHandler struct { + app *raptor.App + initRequest model.InitRequestMessage + doInit func() rapidmodel.AppError + + upstreamEndpoint string + logCollector *LogCollector + + maxPayloadSize int64 +} + +func NewInvokeHandler(lsOpts LsOpts, init model.InitRequestMessage, app *raptor.App, logCollector *LogCollector) (*InvokeHandler, error) { + payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize) + if err != nil { + log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE") + } + + h := &InvokeHandler{ + initRequest: init, + app: app, + maxPayloadSize: int64(payloadSize), + upstreamEndpoint: lsOpts.RuntimeEndpoint, + logCollector: logCollector, + } + + h.doInit = sync.OnceValue(func() rapidmodel.AppError { + initCtx, cancel := context.WithTimeout(context.Background(), time.Duration(h.initRequest.InitTimeout)) + defer cancel() + + dummyInitMetrics := rapid.NewInitMetrics(nil) + res := h.app.Init(initCtx, &h.initRequest, dummyInitMetrics) + return res + }) + + return h, nil +} + +func (h *InvokeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" || r.URL.Path != "/invoke" { + http.Error(w, "Not found", http.StatusNotFound) + return + } + + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + + var invokeReq InvokeRequest + if err := json.Unmarshal(bodyBytes, &invokeReq); err != nil { + http.Error(w, "Invalid invoke request", http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + + go h.invoke(r, invokeReq) +} + +func (h *InvokeHandler) Init() rapidmodel.AppError { + return h.doInit() +} + +func (h *InvokeHandler) invoke(r *http.Request, invokeReq InvokeRequest) { + recorder := httptest.NewRecorder() + + defer func() { + go h.sendUpstreamCallbacks(invokeReq.InvokeId, recorder.Body.Bytes(), recorder.Code) + }() + + if err := h.doInit(); err != nil { + log.Errorf("init failed: %v", err) + h.respondWithError(recorder, err) + return + } + + adaptedInvokeReq := NewAdaptedInvokeRequest(r, recorder, invokeReq, int64(h.maxPayloadSize)) + + ctx := logging.WithInvokeID(context.Background(), adaptedInvokeReq.InvokeID()) + metrics := invoke.NewInvokeMetrics(nil, &noOpCounter{}) + metrics.AttachInvokeRequest(adaptedInvokeReq) + + appErr, responseSent := h.app.Invoke(ctx, adaptedInvokeReq, metrics) + if appErr != nil { + log.Errorf("invoke failed: %v", appErr) + if !responseSent { + h.respondWithError(recorder, appErr) + } + } +} + +func (h *InvokeHandler) respondWithError(w http.ResponseWriter, err rapidmodel.AppError) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(err.ReturnCode()) + w.Write([]byte(err.ErrorDetails())) +} + +func (h *InvokeHandler) sendUpstreamCallbacks(invokeID string, responseBody []byte, statusCode int) { + if invokeID == "" { + return + } + + if h.logCollector != nil { + if logs := h.logCollector.getLogs(); logs.Logs != "" { + serializedLogs, _ := json.Marshal(logs) + _, err := http.Post( + h.upstreamEndpoint+"/invocations/"+invokeID+"/logs", + "application/json", + bytes.NewReader(serializedLogs), + ) + if err != nil { + log.Errorf("Failed to send logs upstream: %v", err) + } + } + } + + var errResp map[string]any + isError := false + if json.Unmarshal(responseBody, &errResp) == nil { + _, hasErrorType := errResp["errorType"] + _, hasErrorMessage := errResp["errorMessage"] + isError = hasErrorType || hasErrorMessage + } + + if statusCode >= 400 { + isError = true + } + + endpoint := "/response" + if isError { + endpoint = "/error" + } + + _, err := http.Post( + h.upstreamEndpoint+"/invocations/"+invokeID+endpoint, + "application/json", + bytes.NewReader(responseBody), + ) + if err != nil { + log.Errorf("Failed to send %s upstream: %v", endpoint, err) + } +} + +type noOpCounter struct{} + +func (c *noOpCounter) AddInvoke(_ uint64) {} diff --git a/internal/lambda-managed-instances/aws-lambda-rie/entrypoint.go b/internal/lambda-managed-instances/aws-lambda-rie/entrypoint.go new file mode 100644 index 0000000..877cb10 --- /dev/null +++ b/internal/lambda-managed-instances/aws-lambda-rie/entrypoint.go @@ -0,0 +1,109 @@ +package rie + +import ( + "context" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "time" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal" + rieinvoke "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/invoke" + rieTelemetry "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie/internal/telemetry" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/invoke" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/invoke/timeout" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapid" + rapidmodel "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapid/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/raptor" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/servicelogs" + supvmodel "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/supervisor/model" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/utils" +) + +func NewRieInvokeRequest(r *http.Request, w http.ResponseWriter) interop.InvokeRequest { + return rieinvoke.NewRieInvokeRequest(r, w) +} + +func GetInitRequestMessage(fileUtil utils.FileUtil, args []string) (model.InitRequestMessage, rapidmodel.AppError) { + return internal.GetInitRequestMessage(fileUtil, args) +} + +func Run( + rapiAddr string, + supv supvmodel.ProcessSupervisor, + fileUtil utils.FileUtil, + logWriter io.Writer, +) (*raptor.App, error) { + runtimeAPIAddr, err := internal.ParseAddr(rapiAddr, "127.0.0.1:9001") + if err != nil { + return nil, fmt.Errorf("invalid runtime API address: %w", err) + } + + telemetryAPIRelay := rieTelemetry.NewRelay() + eventsAPI := rieTelemetry.NewEventsAPI(telemetryAPIRelay) + + responderFactoryFunc := func(_ context.Context, invokeReq interop.InvokeRequest) invoke.InvokeResponseSender { + return rieinvoke.NewResponder(invokeReq) + } + invokeRouter := invoke.NewInvokeRouter(rapid.MaxIdleRuntimesQueueSize, eventsAPI, responderFactoryFunc, timeout.NewRecentCache()) + + deps := rapid.Dependencies{ + EventsAPI: eventsAPI, + LogsEgressAPI: rieTelemetry.NewLogsEgress(telemetryAPIRelay, io.MultiWriter(logWriter, os.Stdout)), + TelemetrySubscriptionAPI: rieTelemetry.NewSubscriptionAPI(telemetryAPIRelay, eventsAPI, eventsAPI), + Supervisor: supv, + RuntimeAPIAddrPort: runtimeAPIAddr, + FileUtils: fileUtil, + InvokeRouter: invokeRouter, + } + + raptorApp, err := raptor.StartApp(deps, "", noOpLogger{}) + if err != nil { + return nil, fmt.Errorf("could not start runtime api server: %w", err) + } + + return raptorApp, nil +} + +// noOpLogger implements the raptorLogger interface with no-op methods +type noOpLogger struct{} + +func (n noOpLogger) Log(_ servicelogs.Operation, _ time.Time, _ []servicelogs.Property, _ []servicelogs.Dimension, _ []servicelogs.Metric) { +} + +func (n noOpLogger) SetInitData(_ interop.InitStaticDataProvider) {} + +func (n noOpLogger) Close() error { return nil } + +func StartServer( + raptorApp *raptor.App, + rieApp http.Handler, + rieAddr string, + sigCh chan os.Signal, +) (*raptor.Server, error) { + emulatorAddr, err := internal.ParseAddr(rieAddr, "0.0.0.0:8080") + if err != nil { + return nil, fmt.Errorf("invalid RIE address: %w", err) + } + + s, err := raptor.StartServer(raptorApp, rieApp, &raptor.TCPAddress{AddrPort: emulatorAddr}) + if err != nil { + return nil, fmt.Errorf("could not start RIE server: %w", err) + } + slog.Debug("RIE server started") + + go func() { + <-raptorApp.Done() + s.Shutdown(raptorApp.Err()) + }() + + s.AttachShutdownSignalHandler(sigCh) + + return s, nil + +}