Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 68 additions & 140 deletions cmd/localstack/main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
// main entrypoint of init
// initial structure based upon /cmd/aws-lambda-rie/main.go
package main

import (
"context"
"io"
"log/slog"
"os"
"runtime/debug"
"strconv"
"strings"
"time"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
mlogging "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/logging"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/rapidcore/env"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -105,160 +103,90 @@ func main() {
UnsetLsEnvs()

// set up logging following the Logrus logging levels: https://github.com/sirupsen/logrus#level-logging
configureLogging(lsOpts.InitLogLevel)

// Download code archives
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
log.Fatal("Failed to download code archives: " + err.Error())
}

if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil {
log.Warnln("Could not change file mode of code directories:", err)
}

// Check if running in managed mode
if _, ok := os.LookupEnv(env.AWS_LAMBDA_MAX_CONCURRENCY); ok {
runManaged(lsOpts)
return
}

runStandard(lsOpts)
}

func doInitDaemon(addr, port string, enable bool, lvl string) *Daemon {
endpoint := "http://" + addr + ":" + port
xrayConfig := initConfig(endpoint, getXRayLogLevel(lvl))
d := initDaemon(xrayConfig, enable)
runDaemon(d)
return d
}

func configureManagedLogger(logLevel string) {
level := slogLevelFromString(logLevel)
slog.SetDefault(mlogging.CreateNewLogger(level, io.Writer(os.Stderr)))
}

func configureStandardLogger(logLevel string) {
log.SetOutput(os.Stderr)
}

func configureLogging(logLevel string) {
log.SetReportCaller(true)
// https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html
xRayLogLevel := "info"
switch lsOpts.InitLogLevel {
switch logLevel {
case "trace":
log.SetFormatter(&log.JSONFormatter{})
log.SetLevel(log.TraceLevel)
xRayLogLevel = "debug"
case "debug":
log.SetLevel(log.DebugLevel)
xRayLogLevel = "debug"
case "info":
log.SetLevel(log.InfoLevel)
case "warn":
log.SetLevel(log.WarnLevel)
xRayLogLevel = "warn"
case "error":
log.SetLevel(log.ErrorLevel)
xRayLogLevel = "error"
case "fatal":
log.SetLevel(log.FatalLevel)
xRayLogLevel = "error"
case "panic":
log.SetLevel(log.PanicLevel)
xRayLogLevel = "error"
default:
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
}
}

// patch MaxPayloadSize
payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize)
if err != nil {
log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE")
}
interop.MaxPayloadSize = payloadSize

// download code archive if env variable is set
if err := DownloadCodeArchives(lsOpts.CodeArchives); err != nil {
log.Fatal("Failed to download code archives: " + err.Error())
}

if err := AdaptFilesystemPermissions(lsOpts.ChmodPaths); err != nil {
log.Warnln("Could not change file mode of code directories:", err)
}

// parse CLI args
bootstrap, handler := getBootstrap(os.Args)

// Switch to non-root user and drop root privileges
if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" {
uid := 993
gid := 990
AddUser(lsOpts.User, uid, gid)
if err := os.Chown("/tmp", uid, gid); err != nil {
log.Warnln("Could not change owner of directory /tmp:", err)
}
UserLogger().Debugln("Process running as root user.")
err := DropPrivileges(lsOpts.User)
if err != nil {
log.Warnln("Could not drop root privileges.", err)
} else {
UserLogger().Debugln("Process running as non-root user.")
}
}

// file watcher for hot-reloading
fileWatcherContext, cancelFileWatcher := context.WithCancel(context.Background())

logCollector := NewLogCollector()
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
tracer := NewLocalStackTracer()

// build sandbox
sandbox := rapidcore.
NewSandboxBuilder().
//SetTracer(tracer).
AddShutdownFunc(func() {
log.Debugln("Stopping file watcher")
cancelFileWatcher()
}).
SetExtensionsFlag(true).
SetInitCachingFlag(true).
SetLogsEgressAPI(localStackLogsEgressApi).
SetTracer(tracer)

// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
// We need to ensure the runtime server is up before the INIT phase,
// but this envar is only set after the InitHandler is called.
runtimeAPIAddress := "127.0.0.1:9001"
sandbox.SetRuntimeAPIAddress(runtimeAPIAddress)

// xray daemon
endpoint := "http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort
xrayConfig := initConfig(endpoint, xRayLogLevel)
d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
sandbox.AddShutdownFunc(func() {
log.Debugln("Shutting down xray daemon")
d.stop()
log.Debugln("Flushing segments in xray daemon")
d.close()
})
runDaemon(d) // async

defaultInterop := sandbox.DefaultInteropServer()
interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector)
sandbox.SetInteropServer(interopServer)
if len(handler) > 0 {
sandbox.SetHandler(handler)
}
exitChan := make(chan struct{})
sandbox.AddShutdownFunc(func() {
exitChan <- struct{}{}
})

// initialize all flows and start runtime API
sandboxContext, internalStateFn := sandbox.Create()
// Populate our custom interop server
interopServer.SetSandboxContext(sandboxContext)
interopServer.SetInternalStateGetter(internalStateFn)

// get timeout
invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT") // TODO: collect all AWS_* env parsing
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
if err != nil {
log.Fatalln(err)
}
go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)

log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddress)
// Fixes https://github.com/localstack/localstack/issues/12680
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if err := waitForRuntimeAPI(ctx, runtimeAPIAddress); err != nil {
log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddress, err.Error())
}
cancel()

// start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
// notification channels and status fields are properly initialized before `AwaitInitialized`
log.Debugln("Starting runtime init.")
InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init

log.Debugln("Awaiting initialization of runtime init.")
if err := interopServer.delegate.AwaitInitialized(); err != nil {
// Error cases: ErrInitDoneFailed or ErrInitResetReceived
log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.")
// NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
// callback SendInitErrorResponse because it contains the correct error response payload.
return
func slogLevelFromString(logLevel string) slog.Level {
switch logLevel {
case "trace", "debug":
return slog.LevelDebug
case "info":
return slog.LevelInfo
case "warn":
return slog.LevelWarn
case "error", "fatal", "panic":
return slog.LevelError
default:
return slog.LevelInfo
}
}

log.Debugln("Completed initialization of runtime init. Sending status ready to LocalStack.")
if err := interopServer.localStackAdapter.SendStatus(Ready, []byte{}); err != nil {
log.Fatalln("Failed to send status ready to LocalStack " + err.Error() + ". Exiting.")
func getXRayLogLevel(initLogLevel string) string {
switch initLogLevel {
case "trace", "debug":
return "debug"
case "warn":
return "warn"
case "error", "fatal", "panic":
return "error"
default:
return "info"
}

<-exitChan
}
146 changes: 146 additions & 0 deletions cmd/localstack/run_managed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"

rie "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/aws-lambda-rie"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/model"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/supervisor/local"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda-managed-instances/utils"
log "github.com/sirupsen/logrus"
)

func runManaged(lsOpts *LsOpts) {
configureManagedLogger(lsOpts.InitLogLevel)

// Initialize X-Ray daemon
d := doInitDaemon(
lsOpts.LocalstackIP,
lsOpts.EdgePort,
lsOpts.EnableXRayTelemetry == "1",
lsOpts.InitLogLevel,
)

defer func() {
log.Debugln("Shutting down xray daemon")
d.stop()
log.Debugln("Flushing segments in xray daemon")
d.close()
}()

var credential *syscall.Credential
if IsRootUser() && lsOpts.User != "" && lsOpts.User != "root" {
uid := 993
gid := 990
AddUser(lsOpts.User, uid, gid)
if err := os.Chown("/tmp", uid, gid); err != nil {
log.Warnln("Could not change owner of directory /tmp:", err)
}

credential = &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
}

UserLogger().Debugln("Configured runtime to run as non-root user:", lsOpts.User)
}

adapter := LocalStackAdapter{
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
RuntimeId: lsOpts.RuntimeId,
}

rieAddr := fmt.Sprintf("0.0.0.0:%s", lsOpts.InteropPort)
rapiAddr := "127.0.0.1:9001"

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

logCollector := NewLogCollector()

var supervOpts []local.ProcessSupervisorOption
supervOpts = append(supervOpts, local.WithLowerPriorities(false))
if credential != nil {
supervOpts = append(supervOpts, local.WithProcessCredential(credential))
}
supv := local.NewProcessSupervisor(supervOpts...)

fileUtil := utils.NewFileUtil()

invokeTimeoutEnv := GetEnvOrDie("AWS_LAMBDA_FUNCTION_TIMEOUT")
invokeTimeoutSeconds, err := strconv.Atoi(invokeTimeoutEnv)
if err != nil {
log.Fatalln(err)
}

raptorApp, err := rie.Run(
rapiAddr, supv, fileUtil, logCollector,
)
if err != nil {
log.Errorf("failed with error: %w", err)

Check failure on line 86 in cmd/localstack/run_managed.go

View workflow job for this annotation

GitHub Actions / test

github.com/sirupsen/logrus.Errorf does not support error-wrapping directive %w
return
}

initReq, err := rie.GetInitRequestMessage(fileUtil, os.Args)
if err != nil {
log.Errorf("could not build initialization parameters: %w", err)

Check failure on line 92 in cmd/localstack/run_managed.go

View workflow job for this annotation

GitHub Actions / test

github.com/sirupsen/logrus.Errorf does not support error-wrapping directive %w
return
}

// HACK(gregfurman): expects the account to be set via the AWS_ACCOUNT_ID env var which is undocumented
initReq.AccountID = lsOpts.AccountId
initReq.FunctionARN = fmt.Sprintf("arn:aws:lambda:%s:%s:function:%s:%s", initReq.AwsRegion, initReq.AccountID, initReq.TaskName, initReq.FunctionVersion)
// Convert seconds to time.Duration (invokeTimeoutSeconds is in seconds, need to convert to Duration)
initReq.InvokeTimeout = model.DurationMS(time.Duration(invokeTimeoutSeconds) * time.Second)

runtimeAPIAddr := raptorApp.RuntimeAPIAddrPort()

rieHandler, err := NewInvokeHandler(
*lsOpts, initReq, raptorApp, logCollector,
)
if err != nil {
log.Fatal("creating RIE handler error:", err)
}

if err := rieHandler.Init(); err != nil {
log.Warn("INIT failed", "err", err)
}

server, err := rie.StartServer(raptorApp, rieHandler, rieAddr, sigChan)
if err != nil {
log.Fatal("Proxy ListenAndServe error:", err)
}

// go RunHotReloadingListener(interopServer, lsOpts.HotReloadingPaths, fileWatcherContext, lsOpts.FileWatcherStrategy)

log.Debugf("Awaiting initialization of runtime api at %s.", runtimeAPIAddr.String())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if err := waitForRuntimeAPI(ctx, runtimeAPIAddr.String()); err != nil {
log.Fatalf("Lambda Runtime API server at %s did not come up in 30s, with error %s", runtimeAPIAddr.String(), err.Error())
}
cancel()

log.Debugln("Completed initialization of runtime. Sending status ready to LocalStack.")
if err := adapter.SendStatus(Ready, []byte{}); err != nil {
log.Fatalln("Failed to send status ready to LocalStack", err, ". Exiting.")
}

select {
case <-server.Done():
if err := server.Err(); err != nil {
log.Warn("rie server stopped", "err", err)
os.Exit(1)
}
case <-raptorApp.Done():
if err := raptorApp.Err(); err != nil {
log.Errorln("Runtime error:", err)
}
}

}
Loading
Loading