From 034856cef6cc4489f3edb65c020123f5f3e66d70 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Fri, 12 May 2023 11:09:35 +0100 Subject: [PATCH 01/21] Pass config for collector tracing to sats --- e2core/backend/satbackend/orchestrator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index 938c95e8..83c8e74c 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -149,6 +149,10 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { "SAT_HTTP_PORT="+port, "SAT_CONTROL_PLANE="+o.opts.ControlPlane, "SAT_CONNECTIONS="+connectionsEnv, + "SAT_TRACER_TYPE=collector", + "SAT_TRACER_SERVICENAME=e2core_bebby-"+port, + "SAT_TRACER_PROBABILITY=1", + "SAT_TRACER_COLLECTOR_ENDPOINT=http://host.docker.internal:4317", ) if err != nil { ll.Err(err).Str("moduleFQMN", module.FQMN).Msg("exec.Run failed for sat instance") From 9c6a53646ecfc6d8acc3230b4898aab1cd66c184 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:17:17 +0100 Subject: [PATCH 02/21] Add curl to the e2core image --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f6168d60..6bedfb15 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,8 @@ RUN groupadd -g 999 e2core && \ chown -R e2core /home/e2core && \ chmod -R 700 /home/e2core RUN apt-get update \ - && apt-get install -y ca-certificates + && apt-get install -y ca-certificates \ + && apt-get install -y curl # e2core binary COPY --from=builder /go/src/github.com/suborbital/e2core/.bin/e2core /usr/local/bin/ From 14feee6c8d4bd0ad8c70e44560633376d1cece88 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:19:21 +0100 Subject: [PATCH 03/21] Add the process ID to the returned arguments, use it in logs --- e2core/backend/satbackend/exec/exec.go | 6 ++--- e2core/backend/satbackend/orchestrator.go | 33 ++++++++++++++++++----- e2core/backend/satbackend/watcher.go | 4 ++- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/e2core/backend/satbackend/exec/exec.go b/e2core/backend/satbackend/exec/exec.go index a303d2c1..9accfd86 100644 --- a/e2core/backend/satbackend/exec/exec.go +++ b/e2core/backend/satbackend/exec/exec.go @@ -15,7 +15,7 @@ type WaitFunc func() error // Run runs a command, outputting to terminal and returning the full output and/or error // a channel is returned which, when sent on, will terminate the process that was started -func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc, error) { +func Run(cmd []string, env ...string) (string, int, context.CancelCauseFunc, WaitFunc, error) { procUUID := uuid.New().String() uuidEnv := fmt.Sprintf("%s_UUID=%s", strings.ToUpper(cmd[0]), procUUID) env = append(env, uuidEnv) @@ -33,10 +33,10 @@ func Run(cmd []string, env ...string) (string, context.CancelCauseFunc, WaitFunc err := command.Start() if err != nil { - return "", nil, nil, errors.Wrap(err, "command.Start()") + return "", -1, nil, nil, errors.Wrap(err, "command.Start()") } - return procUUID, cxl, command.Wait, nil + return procUUID, command.Process.Pid, cxl, command.Wait, nil } // this is unused but we may want to do logging-to-speficig-directory some time in the diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index 83c8e74c..cdb67b1c 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -144,7 +144,7 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { } // repeat forever in case the command does error out - processUUID, cxl, wait, err := exec.Run( + processUUID, pid, cxl, wait, err := exec.Run( cmd, "SAT_HTTP_PORT="+port, "SAT_CONTROL_PLANE="+o.opts.ControlPlane, @@ -162,20 +162,41 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { go func() { err := wait() if err != nil { - ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("calling waitfunc for the module failed") + ll.Err(err). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("waitfunc returned with an error") } + ll.Info(). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("adding port to dead list") + err = satWatcher.addToDead(port) if err != nil { - ll.Err(err).Str("moduleFQMN", module.FQMN).Str("port", port).Msg("adding the port to the dead list") + ll.Err(err). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("adding the port to the dead list failed") } - ll.Info().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("added port to dead list") }() - satWatcher.add(module.FQMN, port, processUUID, cxl) + satWatcher.add(module.FQMN, port, processUUID, pid, cxl) - ll.Debug().Str("moduleFQMN", module.FQMN).Str("port", port).Msg("successfully started sat") + ll.Info(). + Str("moduleFQMN", module.FQMN). + Str("port", port). + Int("pid", pid). + Str("uuid", processUUID). + Msg("successfully started sat") } // we want to max out at 8 threads per instance diff --git a/e2core/backend/satbackend/watcher.go b/e2core/backend/satbackend/watcher.go index 10293ebb..aeea4e1a 100644 --- a/e2core/backend/satbackend/watcher.go +++ b/e2core/backend/satbackend/watcher.go @@ -41,6 +41,7 @@ type instance struct { fqmn string metrics *MetricsResponse uuid string + pid int cxl context.CancelCauseFunc } @@ -76,7 +77,7 @@ func (w *watcher) addToDead(port string) error { } // add inserts a new instance to the watched pool. -func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) { +func (w *watcher) add(fqmn, port, uuid string, pid int, cxl context.CancelCauseFunc) { w.log.Info().Str("port", port).Str("fqmn", fqmn).Msg("adding one to the waitgroup port") w.instancesRunning.Add(1) @@ -88,6 +89,7 @@ func (w *watcher) add(fqmn, port, uuid string, cxl context.CancelCauseFunc) { w.instances[port] = &instance{ fqmn: fqmn, uuid: uuid, + pid: pid, cxl: cxl, } } From 3562751c3212d7ca757b7127c6a70a164dab96e8 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:21:56 +0100 Subject: [PATCH 04/21] Adjust how often things happen and how much timeouts we have --- e2core/backend/satbackend/orchestrator.go | 2 +- e2core/command/start.go | 2 +- e2core/syncer/syncer.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index cdb67b1c..1d58839f 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -51,7 +51,7 @@ func (o *Orchestrator) Start() error { var err error - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(5 * time.Second) loop: for { select { diff --git a/e2core/command/start.go b/e2core/command/start.go index 66db9a57..43fb55e6 100644 --- a/e2core/command/start.go +++ b/e2core/command/start.go @@ -26,7 +26,7 @@ import ( ) const ( - shutdownWaitTime = time.Second * 3 + shutdownWaitTime = time.Second * 10 ) func Start() *cobra.Command { diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index 29872b9d..fe4da037 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -66,7 +66,7 @@ func (s *Syncer) Start() error { return errors.Wrap(err, "failed to Do sync job") } - s.sched.Schedule(scheduler.Every(1, func() scheduler.Job { return scheduler.NewJob("sync", nil) })) + s.sched.Schedule(scheduler.Every(45, func() scheduler.Job { return scheduler.NewJob("sync", nil) })) return nil } From 4c6dd1e7df5a8d69bcd1c6d32f60a46bdd167eb0 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:26:40 +0100 Subject: [PATCH 05/21] Break up parsing environment variables and constructing options into two steps --- e2core/command/mod_start.go | 20 ++++++++++++++++---- sat/main.go | 11 ++++++++++- sat/sat/config.go | 13 ++++--------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/e2core/command/mod_start.go b/e2core/command/mod_start.go index 4033dc62..bf3eb56f 100644 --- a/e2core/command/mod_start.go +++ b/e2core/command/mod_start.go @@ -11,11 +11,13 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/sethvargo/go-envconfig" "github.com/spf13/cobra" "github.com/suborbital/e2core/e2core/release" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" + satOptions "github.com/suborbital/e2core/sat/sat/options" ) func ModStart() *cobra.Command { @@ -30,13 +32,23 @@ func ModStart() *cobra.Command { path = args[0] } + opts, err := satOptions.Resolve(envconfig.OsLookuper()) + if err != nil { + return errors.Wrap(err, "options.Resolve") + } + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix l := zerolog.New(os.Stderr).With(). Timestamp(). - Str("command", "mod start"). - Logger().Level(zerolog.InfoLevel) - - config, err := sat.ConfigFromModuleArg(l, path) + Str("port", string(opts.Port)). + Str("procuuid", string(opts.ProcUUID)). + Int("pid", os.Getpid()). + Int("ppid", os.Getppid()). + Str("mode", "bebby"). + Str("fqmn", path). + Logger() + + config, err := sat.ConfigFromModuleArg(l, opts, path) if err != nil { return errors.Wrap(err, "failed to ConfigFromModuleArg") } diff --git a/sat/main.go b/sat/main.go index b4417aa9..4c52600a 100644 --- a/sat/main.go +++ b/sat/main.go @@ -10,20 +10,29 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "github.com/sethvargo/go-envconfig" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" + satOptions "github.com/suborbital/e2core/sat/sat/options" ) func main() { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + + opts, err := satOptions.Resolve(envconfig.OsLookuper()) + if err != nil { + log.Fatalf("options.Resolve: %s", err) + } + logger := zerolog.New(os.Stderr).With(). Timestamp(). Str("service", "sat-module"). + Str("port", string(opts.Port)). Str("version", sat.SatDotVersion). Logger() - conf, err := sat.ConfigFromArgs(logger) + conf, err := sat.ConfigFromArgs(logger, opts) if err != nil { log.Fatal(err) } diff --git a/sat/sat/config.go b/sat/sat/config.go index debce8a2..d47ec739 100644 --- a/sat/sat/config.go +++ b/sat/sat/config.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" - "github.com/sethvargo/go-envconfig" "gopkg.in/yaml.v3" satOptions "github.com/suborbital/e2core/sat/sat/options" @@ -38,7 +37,7 @@ type Config struct { MetricsConfig satOptions.MetricsConfig } -func ConfigFromArgs(l zerolog.Logger) (*Config, error) { +func ConfigFromArgs(l zerolog.Logger, opts satOptions.Options) (*Config, error) { flag.Parse() args := flag.Args() @@ -48,17 +47,13 @@ func ConfigFromArgs(l zerolog.Logger) (*Config, error) { moduleArg := args[0] - return ConfigFromModuleArg(l, moduleArg) + return ConfigFromModuleArg(l, opts, moduleArg) } -func ConfigFromModuleArg(logger zerolog.Logger, moduleArg string) (*Config, error) { +func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleArg string) (*Config, error) { var module *tenant.Module var FQMN fqmn.FQMN - - opts, err := satOptions.Resolve(envconfig.OsLookuper()) - if err != nil { - return nil, errors.Wrap(err, "ConfigFromModuleArg options.Resolve") - } + var err error // first, determine if we need to connect to a control plane controlPlane := "" From ae990942e7314e49e55964f71a50494344ddd01c Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:26:58 +0100 Subject: [PATCH 06/21] Point sat to the correct collector --- e2core/backend/satbackend/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2core/backend/satbackend/orchestrator.go b/e2core/backend/satbackend/orchestrator.go index 1d58839f..0951284c 100644 --- a/e2core/backend/satbackend/orchestrator.go +++ b/e2core/backend/satbackend/orchestrator.go @@ -152,7 +152,7 @@ func (o *Orchestrator) reconcileConstellation(syncer *syncer.Syncer) { "SAT_TRACER_TYPE=collector", "SAT_TRACER_SERVICENAME=e2core_bebby-"+port, "SAT_TRACER_PROBABILITY=1", - "SAT_TRACER_COLLECTOR_ENDPOINT=http://host.docker.internal:4317", + "SAT_TRACER_COLLECTOR_ENDPOINT=collector:4317", ) if err != nil { ll.Err(err).Str("moduleFQMN", module.FQMN).Msg("exec.Run failed for sat instance") From ad646253a63d2476713e75138fdacb4ee97bf9d9 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:27:26 +0100 Subject: [PATCH 07/21] Guard against a nil pointer on shutdown if source server doesn't exist --- e2core/command/start.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/e2core/command/start.go b/e2core/command/start.go index 43fb55e6..8f6f0825 100644 --- a/e2core/command/start.go +++ b/e2core/command/start.go @@ -116,8 +116,10 @@ func Start() *cobra.Command { return errors.Wrap(err, "srv.Shutdown") } - if err := sourceSrv.Shutdown(ctx); err != nil { - return errors.Wrap(err, "sourceSrv.Shutdown") + if sourceSrv != nil { + if err := sourceSrv.Shutdown(ctx); err != nil { + return errors.Wrap(err, "sourceSrv.Shutdown") + } } backend.Shutdown() From 1102d55972085a0be42c6f4f766d9493bf25d990 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:27:49 +0100 Subject: [PATCH 08/21] Fix an import shadow variable issue --- e2core/command/start.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2core/command/start.go b/e2core/command/start.go index 8f6f0825..5dd695d3 100644 --- a/e2core/command/start.go +++ b/e2core/command/start.go @@ -199,14 +199,14 @@ func setupSourceServer(logger zerolog.Logger, opts *options.Options) (*echo.Echo ll.Debug().Msg("creating sourceserver from bundle: " + opts.BundlePath) - server, err := sourceserver.FromBundle(opts.BundlePath) + sourceSrv, err := sourceserver.FromBundle(opts.BundlePath) if err != nil { return nil, errors.Wrap(err, "failed to sourceserver.FromBundle") } - server.HideBanner = true + sourceSrv.HideBanner = true - return server, nil + return sourceSrv, nil } // a nil server is ok if we don't need to run one From da8160d4e63f1a33b4a7a280f3c7c6f924876b37 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:28:44 +0100 Subject: [PATCH 09/21] Move putting a job on a syncer as the same step --- e2core/syncer/syncer.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index fe4da037..3420be46 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -38,16 +38,15 @@ func New(opts *options.Options, logger zerolog.Logger, source system.Source) *Sy s := &Syncer{ sched: scheduler.New(), opts: opts, - } - - s.job = &syncJob{ - systemSource: source, - state: &system.State{}, - tenantIdents: make(map[string]int64), - overviews: make(map[string]*system.TenantOverview), - modules: make(map[string]tenant.Module), - log: logger.With().Str("module", "syncJob").Logger(), - lock: &sync.RWMutex{}, + job: &syncJob{ + systemSource: source, + state: &system.State{}, + tenantIdents: make(map[string]int64), + overviews: make(map[string]*system.TenantOverview), + modules: make(map[string]tenant.Module), + log: logger.With().Str("module", "syncJob").Logger(), + lock: &sync.RWMutex{}, + }, } s.sched.Register("sync", s.job) From b1beb8fdbf52e6ebf45bcfcbbceeffbce1fac5df Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:31:26 +0100 Subject: [PATCH 10/21] Add logger to pod --- foundation/bus/bus/bus.go | 3 ++- foundation/bus/bus/pod.go | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/foundation/bus/bus/bus.go b/foundation/bus/bus/bus.go index 19da6ae3..d663a7d3 100644 --- a/foundation/bus/bus/bus.go +++ b/foundation/bus/bus/bus.go @@ -87,7 +87,8 @@ func (b *Bus) Stop() error { } func (b *Bus) connectWithOpts(opts *podOpts) *Pod { - pod := newPod(b.bus.busChan, b.Tunnel, opts) + b.logger.Info().Msg("creating a new pod with the bus's Tunnel method. That one takes the hub on the bus, and calls the sendTunneledMessage") + pod := newPod(b.bus.busChan, b.Tunnel, opts, b.logger.With().Str("component", "pod").Logger()) b.bus.addPod(pod) diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 673ed534..08673f1f 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -4,6 +4,8 @@ import ( "errors" "sync" "sync/atomic" + + "github.com/rs/zerolog" ) const ( @@ -41,6 +43,8 @@ type Pod struct { onFunc MsgFunc // the onFunc is called whenever a message is received onFuncLock sync.RWMutex + logger zerolog.Logger + messageChan MsgChan // messageChan is used to receive messages coming from the bus feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status busChan MsgChan // busChan is used to emit messages to the bus @@ -60,7 +64,7 @@ type podOpts struct { } // newPod creates a new Pod -func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) *Pod { +func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts, l zerolog.Logger) *Pod { p := &Pod{ onFuncLock: sync.RWMutex{}, messageChan: make(chan Message, defaultPodChanSize), @@ -70,6 +74,7 @@ func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) tunnelFunc: tunnel, opts: opts, dead: &atomic.Value{}, + logger: l, } // do some "delayed setup" From f24c27315147267c563dd887a7ab48bad9e53119 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:32:01 +0100 Subject: [PATCH 11/21] Make an error chan buffered --- foundation/bus/bus/hub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/foundation/bus/bus/hub.go b/foundation/bus/bus/hub.go index 14b77fc7..6bd63ee2 100644 --- a/foundation/bus/bus/hub.go +++ b/foundation/bus/bus/hub.go @@ -280,7 +280,7 @@ func (h *hub) addConnection(connection Connection, uuid, belongsTo string, inter Conn: connection, Pod: h.pod, Signaler: signaler, - ErrChan: make(chan error), + ErrChan: make(chan error, 1), BelongsTo: belongsTo, Interests: interests, Log: h.log, From 1cedf20dab8a3d9e1e9b4fce9f7d867579e6a267 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:32:58 +0100 Subject: [PATCH 12/21] Rework hub scan for failed messages loop --- foundation/bus/bus/hub.go | 56 ++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/foundation/bus/bus/hub.go b/foundation/bus/bus/hub.go index 6bd63ee2..5b4b0119 100644 --- a/foundation/bus/bus/hub.go +++ b/foundation/bus/bus/hub.go @@ -344,37 +344,49 @@ func (h *hub) connectionExists(uuid string) bool { // check for failed connections and clean them up func (h *hub) scanFailedMeshConnections() { ll := h.log.With().Str("method", "scanFailedMeshConnections").Logger() - for { - // we don't want to edit the `meshConnections` map while in the loop, so do it after - toRemove := make([]string, 0) - - // for each connection, check if it has errored or if its peer has withdrawn, - // and in either case close it and remove it from circulation - for _, conn := range h.meshConnections { - select { - case <-conn.ErrChan: - if err := conn.Close(); err != nil { - ll.Err(err).Str("connUUID", conn.UUID).Msg("failed to Close connection") - } - toRemove = append(toRemove, conn.UUID) - default: - if conn.Signaler.PeerWithdrawn() { + ll.Info().Msg("starting the loop to scan for failed mesh connections") + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + // ll.Info().Msg("starting loop") + // we don't want to edit the `meshConnections` map while in the loop, so do it after + toRemove := make([]string, 0) + + // for each connection, check if it has errored or if its peer has withdrawn, + // and in either case close it and remove it from circulation + for _, conn := range h.meshConnections { + select { + case <-conn.ErrChan: if err := conn.Close(); err != nil { - ll.Err(err).Str("connUUID", conn.UUID).Msg( - "failed to Close connection") + ll.Err(err).Str("connUUID", conn.UUID).Msg("failed to Close connection") } + ll.Warn().Str("conn-uuid", conn.UUID).Msg("adding this to removal") toRemove = append(toRemove, conn.UUID) + default: + // ll.Info().Str("conn-uuid", conn.UUID).Msg("no error came in, doing default") + if conn.Signaler.PeerWithdrawn() { + if err := conn.Close(); err != nil { + ll.Err(err).Str("connUUID", conn.UUID).Msg( + "failed to Close connection") + } + + ll.Warn().Str("conn-uuid", conn.UUID).Msg("peer has withdrawn, so removing it from here") + + toRemove = append(toRemove, conn.UUID) + } } } - } - for _, uuid := range toRemove { - h.removeMeshConnection(uuid) + for _, uuid := range toRemove { + ll.Info().Str("conn-uuid", uuid).Msg("removing mesh connection") + h.removeMeshConnection(uuid) + } } - - time.Sleep(time.Second) } } From b757de7dd7726370fe389d4afa41680ac703a164 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 08:33:57 +0100 Subject: [PATCH 13/21] Absolute metric ton of logging adds and adjusts --- e2core/command/mod_start.go | 6 ++- e2core/command/start.go | 2 +- e2core/server/dispatcher.go | 32 ++++++++++++++-- e2core/server/handlers.go | 1 + e2core/server/server.go | 4 +- foundation/bus/bus/bus.go | 3 +- foundation/bus/bus/connectionhandler.go | 13 +++++-- foundation/bus/bus/hub.go | 38 +++++++++++++++---- foundation/bus/bus/options.go | 2 +- foundation/bus/bus/pod.go | 18 +++++++-- foundation/bus/transport/kafka/tester/main.go | 2 +- foundation/bus/transport/nats/tester/main.go | 2 +- .../bus/transport/websocket/tester/main.go | 2 +- .../bus/transport/websocket/transport.go | 21 +++++++--- foundation/scheduler/core.go | 5 ++- foundation/scheduler/scheduler.go | 4 ++ sat/sat/config.go | 14 ------- sat/sat/meshed.go | 16 ++++++-- sat/sat/sat.go | 4 +- sat/sat/tracing.go | 4 +- 20 files changed, 142 insertions(+), 51 deletions(-) diff --git a/e2core/command/mod_start.go b/e2core/command/mod_start.go index bf3eb56f..4df9e2ee 100644 --- a/e2core/command/mod_start.go +++ b/e2core/command/mod_start.go @@ -61,14 +61,18 @@ func ModStart() *cobra.Command { } if httpPort > 0 { config.Port = httpPort - l.Debug().Int("port", httpPort).Msg(fmt.Sprintf("Using port :%d for the sat backend", httpPort)) + l.Info().Int("port", httpPort).Msg(fmt.Sprintf("Using port :%d for the sat backend", httpPort)) } + l.Info().Interface("sdkTrace-config", config.TracerConfig).Msg("this is the sdkTrace config we're using") + traceProvider, err := sat.SetupTracing(config.TracerConfig, l) if err != nil { return errors.Wrap(err, "setup tracing") } + l.Info().Msg("successfully set up tracing") + mctx, mcancel := context.WithTimeout(context.Background(), 5*time.Second) defer mcancel() diff --git a/e2core/command/start.go b/e2core/command/start.go index 5dd695d3..c99958fb 100644 --- a/e2core/command/start.go +++ b/e2core/command/start.go @@ -143,7 +143,7 @@ func setupLogger() zerolog.Logger { logger := zerolog.New(os.Stderr).With(). Timestamp(). - Str("command", "start"). + Str("mode", "mothership"). Str("version", release.Version). Logger().Level(zerolog.InfoLevel) diff --git a/e2core/server/dispatcher.go b/e2core/server/dispatcher.go index 0488ea47..ff4d1407 100644 --- a/e2core/server/dispatcher.go +++ b/e2core/server/dispatcher.go @@ -56,17 +56,23 @@ func newDispatcher(l zerolog.Logger, pod *bus.Pod) *dispatcher { // Execute returns the "final state" of a Sequence. If the state's err is not nil, it means a runnable returned an error, and the Directive indicates the Sequence should return. // if exec itself actually returns an error other than ErrSequenceRunErr, it means there was a problem executing the Sequence as described, and should be treated as such. func (d *dispatcher) Execute(seq *sequence.Sequence) error { + ll := d.log.With().Str("requestID", seq.ParentID()).Logger() + ll.Info().Interface("dispatcher-pod", d.pod).Msg("created a sequence dispatcher") s := &sequenceDispatcher{ seq: seq, pod: d.pod, - log: d.log, + log: ll.With().Str("part", "sequenceDispatcher").Logger(), } + ll.Info().Msg("creating a result chan, and a callback function that takes in a result, and sends that result back into the resultchan.") + resultChan := make(chan *sequence.ExecResult) cb := func(result *sequence.ExecResult) { + ll.Info().Msg("callback: sending result to resultchan") resultChan <- result } + ll.Info().Msg("this callback is added to the sequence.parentID in the dispatcher. It's just a map. One sequence ID, one callback") d.addCallback(seq.ParentID(), cb) defer d.removeCallback(seq.ParentID()) @@ -75,6 +81,8 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { return errors.New("sequence contains no steps") } + ll.Info().Interface("first-step", firstStep).Msg("dispatchsingle gets called on the sequence dispatcher. Arguments are the results channel and the first step.") + if err := s.dispatchSingle(firstStep, resultChan); err != nil { return errors.Wrap(err, "failed to dispatchSingle") } @@ -106,14 +114,18 @@ func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan return errors.Wrap(err, "failed to req.toJSON") } + s.log.Info().Str("data in dispatchSingle", string(data)).Msg("message about to be sent") + msg := bus.NewMsgWithParentID(step.FQMN, s.seq.ParentID(), data) - // find an appropriate peer and tunnel the first excution to them + s.log.Info().Interface("bus.Message", msg).Msg("bus msg. Next is pod.tunnel with step.fqmn with message.") + + // find an appropriate peer and tunnel the first execution to them if err := s.pod.Tunnel(step.FQMN, msg); err != nil { return errors.Wrap(err, "failed to Tunnel") } - s.log.Debug().Str("parentID", s.seq.ParentID()). + s.log.Info(). Str("msgUUID", msg.UUID()). Msg("dispatched execution for parent to peer with message") @@ -123,14 +135,19 @@ func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) error { select { case result := <-resultChan: + s.log.Info().Msg("we have a message back from the result channel") if result.Response == nil { + s.log.Error().Msg("sadly the response was nil") return fmt.Errorf("recieved nil response for %s", result.FQMN) } + s.log.Info().Msg("handling the step results") if err := s.seq.HandleStepResults([]sequence.ExecResult{*result}); err != nil { + s.log.Err(err).Msg("something went wrong while handling the step results") return errors.Wrap(err, "failed to HandleStepResults") } case <-time.After(time.Second * 10): + s.log.Warn().Msg("dispatchSingle timeout reached") return ErrDispatchTimeout } @@ -140,21 +157,28 @@ func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) e // onMsgHandler is called when a new message is received from the pod func (d *dispatcher) onMsgHandler() bus.MsgFunc { return func(msg bus.Message) error { + ll := d.log.With().Str("requestID", msg.ParentID()).Logger() d.lock.RLock() defer d.lock.RUnlock() + + ll.Info().Msg("message received to dispatcher.onMsgHandler") + // we only care about the messages related to our specific sequence callback, exists := d.callbacks[msg.ParentID()] if !exists { + ll.Warn().Str("uuid", msg.ParentID()).Msg("did not exist") return nil } result := &sequence.ExecResult{} if err := json.Unmarshal(msg.Data(), result); err != nil { - d.log.Err(err).Msg("json.Unmarshal message data failure") + ll.Err(err).Msg("json.Unmarshal message data failure") return nil } + ll.Info().Str("requestID", msg.ParentID()).Msg("calling the callback with the result") + callback(result) return nil diff --git a/e2core/server/handlers.go b/e2core/server/handlers.go index 5d8382eb..73a8f34b 100644 --- a/e2core/server/handlers.go +++ b/e2core/server/handlers.go @@ -24,6 +24,7 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc { name := ReadParam(c, "name") ll := s.logger.With(). + Str("requestID", c.Response().Header().Get(echo.HeaderXRequestID)). Str("ident", ident). Str("namespace", namespace). Str("fn", name). diff --git a/e2core/server/server.go b/e2core/server/server.go index 2ec27d92..7f8189d6 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -36,7 +36,7 @@ type Server struct { // New creates a new Server instance. func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, error) { - ll := l.With().Str("module", "server").Logger() + ll := l.With().Str("module", "e2core-server").Logger() // @todo https://github.com/suborbital/e2core/issues/144, the first return value is a function that would close the // tracer in case of a shutdown. Usually that is put in a defer statement. Server doesn't have a graceful shutdown. @@ -48,6 +48,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, busOpts := []bus.OptionsModifier{ bus.UseMeshTransport(websocket.New()), bus.UseDiscovery(local.New()), + bus.UseLogger(ll), } b := bus.New(busOpts...) @@ -58,6 +59,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, e.Use( mid.UUIDRequestID(), + mid.Logger(ll, nil), otelecho.Middleware("e2core"), middleware.Recover(), ) diff --git a/foundation/bus/bus/bus.go b/foundation/bus/bus/bus.go index d663a7d3..9223e027 100644 --- a/foundation/bus/bus/bus.go +++ b/foundation/bus/bus/bus.go @@ -33,7 +33,7 @@ func New(opts ...OptionsModifier) *Bus { BelongsTo: options.BelongsTo, Interests: options.Interests, bus: newMessageBus(), - logger: options.Logger, + logger: options.Logger.With().Str("bus", "bus").Logger(), } // the hub handles coordinating the transport and discovery plugins @@ -71,6 +71,7 @@ func (b *Bus) ConnectBridgeTopic(topic string) error { // This bypasses the main Bus bus, which is why it isn't a method on Pod. // Messages are load balanced between the connections that advertise the capability in question. func (b *Bus) Tunnel(capability string, msg Message) error { + b.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("bus.Tunnel is happening (pod tunnelfunc?)") return b.hub.sendTunneledMessage(capability, msg) } diff --git a/foundation/bus/bus/connectionhandler.go b/foundation/bus/bus/connectionhandler.go index f920c68c..e5c0524a 100644 --- a/foundation/bus/bus/connectionhandler.go +++ b/foundation/bus/bus/connectionhandler.go @@ -20,7 +20,7 @@ type connectionHandler struct { // Start starts up a listener to read messages from the connection into the Grav bus func (c *connectionHandler) Start() { - ll := c.Log.With().Str("method", "Start").Logger() + ll := c.Log.With().Str("method", "connectionHandler.Start").Logger() withdrawChan := c.Signaler.Listen() go func() { @@ -42,11 +42,12 @@ func (c *connectionHandler) Start() { for { msg, connWithdraw, err := c.Conn.ReadMsg() if err != nil { + // the error that happened is not an "I withdrew" or "my peer withdrew", it's a broken conn if !(c.Signaler.SelfWithdrawn() || c.Signaler.PeerWithdrawn()) { - ll.Err(err).Str("connectionUUID", c.UUID).Msg("failed to ReadMsg from connection") + ll.Err(err).Str("connectionUUID", c.UUID).Msg("failed to ReadMsg from connection, sending to errchan") c.ErrChan <- err } else { - ll.Debug().Msgf("failed to ReadMsg from withdrawn connection, ignoring: %s", err.Error()) + ll.Err(err).Msg("failed to ReadMsg from withdrawn connection, ignoring") } return @@ -60,7 +61,7 @@ func (c *connectionHandler) Start() { return } - ll.Debug().Str("messageUUID", msg.UUID()).Msg("received message") + ll.Debug().Str("messageUUID", msg.UUID()).Str("requestID", msg.ParentID()).Msg("received message") c.Pod.Send(msg) } @@ -68,15 +69,19 @@ func (c *connectionHandler) Start() { } func (c *connectionHandler) Send(msg Message) error { + ll := c.Log.With().Str("requestID", msg.ParentID()).Logger() if c.Signaler.PeerWithdrawn() { return ErrNodeWithdrawn } if err := c.Conn.SendMsg(msg); err != nil { + ll.Err(err).Msg("c.conn.sendmsg returned an error") c.ErrChan <- err return errors.Wrap(err, "failed to SendMsg") } + ll.Info().Msg("message sent successfully") + return nil } diff --git a/foundation/bus/bus/hub.go b/foundation/bus/bus/hub.go index 5b4b0119..dcb13e45 100644 --- a/foundation/bus/bus/hub.go +++ b/foundation/bus/bus/hub.go @@ -41,7 +41,7 @@ func initHub(nodeUUID string, options *Options, connectFunc func() *Pod) *hub { mesh: options.MeshTransport, bridge: options.BridgeTransport, discovery: options.Discovery, - log: options.Logger.With().Str("module", "hub").Logger().Level(zerolog.InfoLevel), + log: options.Logger.With().Str("module", "hub").Logger(), pod: connectFunc(), connectFunc: connectFunc, meshConnections: map[string]*connectionHandler{}, @@ -109,20 +109,33 @@ func (h *hub) messageHandler(msg Message) error { h.lock.RLock() defer h.lock.RUnlock() + ll := h.log.With().Str("requestID", msg.ParentID()).Str("method", "hub.messageHandler").Logger() + + ll.Info().Msg("sending the message to all meshconnections") + // send the message to each. withdrawn connections will result in a no-op for uuid := range h.meshConnections { + ll.Info(). + Str("meshconnection-uuid", uuid). + Msg("sending the message to the handler at this uuid") + handler := h.meshConnections[uuid] - handler.Send(msg) + err := handler.Send(msg) + if err != nil { + ll.Err(err).Str("meshconnection-uuid", uuid). + Msg("send returned an error") + } } return nil } func (h *hub) discoveryHandler() func(endpoint string, uuid string) { + ll := h.log.With().Str("method", "discoveryHandler").Logger() + return func(endpoint string, uuid string) { - ll := h.log.With().Str("method", "discoveryHandler").Logger() if uuid == h.nodeUUID { - ll.Debug().Msg("discovered self, discarding") + ll.Debug().Str("uuid", uuid).Msg("discovered self, discarding") return } @@ -391,13 +404,20 @@ func (h *hub) scanFailedMeshConnections() { } func (h *hub) sendTunneledMessage(capability string, msg Message) error { - ll := h.log.With().Str("method", "sendTunneledMessage").Logger() + ll := h.log.With().Str("method", "sendTunneledMessage"). + Str("requestID", msg.ParentID()).Logger() + + ll.Info().Str("capability", capability).Msg("sending a message with cap. Checking the hub's capabilityBalancers map. It seems to be a list of UUIDs for ... things? belonging to the same capability.") balancer, exists := h.capabilityBalancers[capability] if !exists { return ErrTunnelNotEstablished } + ll.Info().Interface("balancer", balancer).Str("capability", capability).Msg("balancer for capability") + + ll.Info().Int("tunnel-retry-count", tunnelRetryCount).Msg("starting iteration to check whether we can send a message to someplace") + // iterate a reasonable number of times to find a connection that's not removed or dead for i := 0; i < tunnelRetryCount; i++ { @@ -407,18 +427,19 @@ func (h *hub) sendTunneledMessage(capability string, msg Message) error { defer h.lock.RUnlock() uuid := balancer.Next() + ll.Info().Int("iteration", i).Str("uuid", uuid).Msg("balancer next") if uuid == "" { return nil, ErrTunnelNotEstablished } handler, exists := h.meshConnections[uuid] + ll.Info().Bool("handler-exists", exists).Msg("hub has a meshconnections map") if !exists { return nil, ErrTunnelNotEstablished } return handler, nil }() - if err != nil { continue } @@ -426,11 +447,14 @@ func (h *hub) sendTunneledMessage(capability string, msg Message) error { if handler.Conn != nil { if err := handler.Send(msg); err != nil { ll.Err(err).Msg("failed to SendMsg on tunneled connection, will remove") + return errors.Wrap(err, "handler.Send died") } else { - ll.Debug().Str("handlerUUID", handler.UUID).Msg("tunneled to handler") + ll.Info().Str("handlerUUID", handler.UUID).Msg("tunneled to handler") return nil } } + + ll.Info().Msg("handler connection was nil") } return ErrTunnelNotEstablished diff --git a/foundation/bus/bus/options.go b/foundation/bus/bus/options.go index 2859bc80..72d19c69 100644 --- a/foundation/bus/bus/options.go +++ b/foundation/bus/bus/options.go @@ -91,7 +91,7 @@ func defaultOptions() *Options { o := &Options{ BelongsTo: "*", Interests: []string{}, - Logger: zerolog.New(os.Stderr).With().Timestamp().Logger(), + Logger: zerolog.New(os.Stderr).With().Str("mode", "default-options").Timestamp().Logger(), Port: "8080", URI: "/meta/message", MeshTransport: nil, diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 08673f1f..7c9783cd 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -91,26 +91,38 @@ func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts, // It is safe to call methods on a nil ticket, they will error with ErrNoTicket // This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...) func (p *Pod) Send(msg Message) *MsgReceipt { + ll := p.logger.With().Str("requestID", msg.ParentID()).Logger() + + ll.Info().Msg("sending message in pod.send") + // check to see if the pod has died (aka disconnected) if p.dead.Load().(bool) == true { + ll.Warn().Msg("pod has died") return nil } + ll.Info().Str("msg uuid", msg.UUID()).Msg("filtering message") + p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod + ll.Info().Msg("sending message to the bus chan") + p.busChan <- msg - t := &MsgReceipt{ + ll.Info().Msg("sent message to bus chan") + + ll.Info().Msg("returning a message receipt") + + return &MsgReceipt{ UUID: msg.UUID(), pod: p, } - - return t } // Tunnel bypasses the pod's normal 'Send' and uses the bus itself to tunnel to a specific peer // if a transport is enabled. If not, it's a no-op. func (p *Pod) Tunnel(capability string, msg Message) error { + p.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("tunneling using the pod's tunnelfunc") return p.tunnelFunc(capability, msg) } diff --git a/foundation/bus/transport/kafka/tester/main.go b/foundation/bus/transport/kafka/tester/main.go index 426b1daa..f2a0c309 100644 --- a/foundation/bus/transport/kafka/tester/main.go +++ b/foundation/bus/transport/kafka/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "kafka-tester").Timestamp().Logger() knats, err := kafka.New("127.0.0.1:9092") if err != nil { diff --git a/foundation/bus/transport/nats/tester/main.go b/foundation/bus/transport/nats/tester/main.go index 5f0d208a..fcb9bb87 100644 --- a/foundation/bus/transport/nats/tester/main.go +++ b/foundation/bus/transport/nats/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "nats-tester").Timestamp().Logger() gnats, err := nats.New("nats://localhost:4222") if err != nil { diff --git a/foundation/bus/transport/websocket/tester/main.go b/foundation/bus/transport/websocket/tester/main.go index b4883664..f4aeac5b 100644 --- a/foundation/bus/transport/websocket/tester/main.go +++ b/foundation/bus/transport/websocket/tester/main.go @@ -14,7 +14,7 @@ import ( ) func main() { - logger := zerolog.New(os.Stderr).With().Timestamp().Logger() + logger := zerolog.New(os.Stderr).With().Str("mode", "websocket-tester").Timestamp().Logger() gwss := websocket.New() locald := local.New() diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index 18aef08b..dceb570c 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -93,45 +93,55 @@ func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { return } + t.log.Info().Msg("receiving a message I think") + c, err := upgrader.Upgrade(w, r, nil) if err != nil { t.log.Err(err).Msg("could not upgrade connection to websocket") return } - t.log.Debug().Str("connectionURL", r.URL.String()).Msg("upgraded connection") + t.log.Info().Str("connectionURL", r.URL.String()).Msg("upgraded connection") conn := &Conn{ conn: c, log: t.log, } + t.log.Info().Interface("connectionfunc", t.connectionFunc).Msg("connection func is this, apparently, bus.Connect, again? request is in the conn.conn as an upgraded websocket connection") + t.connectionFunc(conn) } } // SendMsg sends a message to the connection func (c *Conn) SendMsg(msg bus.Message) error { + ll := c.log.With().Str("requestID", msg.ParentID()). + Str("msg-uuid", msg.UUID()). + Str("node-uuid", c.nodeUUID).Logger() + msgBytes, err := msg.Marshal() if err != nil { return errors.Wrap(err, "[transport-websocket] failed to Marshal message") } - c.log.Debug().Str("msgUUID", msg.UUID()). - Str("nodeUUID", c.nodeUUID).Msg("sending message to connection") + ll.Info().Msg("sending message to connection over binary") if err := c.WriteMessage(websocket.BinaryMessage, msgBytes); err != nil { if errors.Is(err, websocket.ErrCloseSent) { + ll.Err(err).Msg("websocket error close sent bla bla") return bus.ErrConnectionClosed } else if err == bus.ErrNodeWithdrawn { + ll.Err(err).Msg("node was withdrawn") return err } + ll.Err(err).Msg("some super different error with connection") + return errors.Wrap(err, "[transport-websocket] failed to WriteMessage") } - c.log.Debug().Str("msgUUID", msg.UUID()). - Str("nodeUUID", c.nodeUUID).Msg("sent message to connection") + ll.Info().Msg("sent message to connection") return nil } @@ -160,6 +170,7 @@ func (c *Conn) ReadMsg() (bus.Message, *bus.Withdraw, error) { } c.log.Debug(). + Str("requestID", msg.ParentID()). Str("msgUUID", msg.UUID()). Str("nodeUUID", c.nodeUUID). Msg("received message from node") diff --git a/foundation/scheduler/core.go b/foundation/scheduler/core.go index 410363ac..7c45e1d4 100644 --- a/foundation/scheduler/core.go +++ b/foundation/scheduler/core.go @@ -39,6 +39,8 @@ func newCore(log zerolog.Logger) *core { func (c *core) do(job *Job) *Result { result := newResult(job.UUID()) + c.log.Info().Msg("core.do function got called") + jobWorker := c.scaler.findWorker(job.jobType) if jobWorker == nil { result.sendErr(fmt.Errorf("failed to getWorker for jobType %q", job.jobType)) @@ -47,10 +49,11 @@ func (c *core) do(job *Job) *Result { go func() { job.result = result - + c.log.Info().Msg("jobworker got a job scheduled") jobWorker.schedule(job) }() + c.log.Info().Msg("returning result from core.do func") return result } diff --git a/foundation/scheduler/scheduler.go b/foundation/scheduler/scheduler.go index 0f76abf1..ce1188f7 100644 --- a/foundation/scheduler/scheduler.go +++ b/foundation/scheduler/scheduler.go @@ -50,6 +50,8 @@ func NewWithLogger(log zerolog.Logger) *Scheduler { // Do schedules a job to be worked on and returns a result object func (r *Scheduler) Do(job Job) *Result { + r.log.Info().Msg("scheduler.Do function got called, passing it on to core.do") + return r.core.do(&job) } @@ -134,6 +136,8 @@ func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Mess // each time a message is received with the associated type, // execute the associated job and pass the result to `run` pod.OnType(msgType, func(msg bus.Message) error { + r.log.Info().Str("msgType", msgType).Msg("scheduler.ListenAndRun called, msg turned into a job, and job passed to scheduler.Do function") + result, err := helper(msg.Data()).Then() run(msg, result, err) diff --git a/sat/sat/config.go b/sat/sat/config.go index d47ec739..8f8ef492 100644 --- a/sat/sat/config.go +++ b/sat/sat/config.go @@ -74,7 +74,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleA // next, handle the module arg being a URL, an FQMN, or a path on disk if isURL(moduleArg) { - logger.Debug().Msg("fetching module from URL") tmpFile, err := downloadFromURL(moduleArg) if err != nil { return nil, errors.Wrap(err, "failed to downloadFromURL") @@ -83,8 +82,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleA moduleArg = tmpFile } else if FQMN, err = fqmn.Parse(moduleArg); err == nil { if useControlPlane { - logger.Debug().Msg("fetching module from control plane") - cpModule, err := appClient.GetModule(moduleArg) if err != nil { return nil, errors.Wrap(err, "failed to GetModule") @@ -130,17 +127,6 @@ func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleA jobType = module.FQMN prettyName = fmt.Sprintf("%s-%s", jobType, opts.ProcUUID[:6]) - - logger = logger.With(). - Str("app", prettyName). - Str("jobType", jobType). - Str("tenant", FQMN.Tenant). - Logger() - - logger.Debug().Msg("configuring") - logger.Debug().Msg("joining tenant") - } else { - logger.Debug().Str("jobType", jobType).Msg("configuring") } conns := make([]tenant.Connection, 0) diff --git a/sat/sat/meshed.go b/sat/sat/meshed.go index 4ceb1ac5..39dade35 100644 --- a/sat/sat/meshed.go +++ b/sat/sat/meshed.go @@ -19,7 +19,8 @@ import ( // when a meshed peer sends us a job, it is executed by Reactr and then // the result is passed into this function for handling func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { - ll := s.logger.With().Str("method", "handleFnResult").Logger() + ll := s.logger.With().Str("method", "handleFnResult"). + Str("requestID", msg.ParentID()).Logger() // first unmarshal the request and sequence information req, err := request.FromJSON(msg.Data()) @@ -55,12 +56,15 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { if fnErr != nil { if fnRunErr, isRunErr := fnErr.(scheduler.RunErr); isRunErr { + ll.Err(fnErr).Msg("it's a run error") // great, it's a runErr runErr = fnRunErr } else { + ll.Err(fnErr).Msg("it's an exec error") execErr = fnErr } } else { + ll.Info().Msg("result is a coordinated response, hopefully") resp = result.(*request.CoordinatedResponse) } @@ -78,6 +82,8 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { }(), } + ll.Info().Msg("sending the fn result back") + if err = s.sendFnResult(fnr, spanCtx); err != nil { ll.Err(err).Msg("s.sendFnResult") return @@ -89,6 +95,8 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { return } + ll.Info().Msg("dealing with exec result for fn result") + if err = seq.HandleStepResults([]sequence.ExecResult{*fnr}); err != nil { ll.Err(err).Msg("seq.HandleStepResults") return @@ -103,6 +111,7 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { req.SequenceJSON = stepJSON + ll.Info().Msg("sending next step") s.sendNextStep(msg, seq, req, spanCtx) } @@ -123,6 +132,7 @@ func (s *Sat) sendFnResult(result *sequence.ExecResult, ctx context.Context) err respMsg := bus.NewMsgWithParentID(server.MsgTypeSuborbitalResult, reqID, fnrJSON) s.logger.Debug(). + Str("requestID", reqID). Str("method", "sendFnResult"). Str("function", s.config.JobType). Str("respUUID", respMsg.UUID()). @@ -135,8 +145,8 @@ func (s *Sat) sendFnResult(result *sequence.ExecResult, ctx context.Context) err return nil } -func (s *Sat) sendNextStep(_ bus.Message, seq *sequence.Sequence, req *request.CoordinatedRequest, ctx context.Context) { - ll := s.logger.With().Str("method", "sendNextStep").Logger() +func (s *Sat) sendNextStep(msg bus.Message, seq *sequence.Sequence, req *request.CoordinatedRequest, ctx context.Context) { + ll := s.logger.With().Str("method", "sendNextStep").Str("requestID", msg.ParentID()).Logger() span := trace.SpanFromContext(ctx) defer span.End() diff --git a/sat/sat/sat.go b/sat/sat/sat.go index 90318a49..12941dbd 100644 --- a/sat/sat/sat.go +++ b/sat/sat/sat.go @@ -82,11 +82,13 @@ func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvid // if a "transport" is configured, enable bus and metrics endpoints, otherwise enable server mode if config.ControlPlaneUrl != "" { + logger.Info().Msg("controlplane url is present, creating the websocket for transport, and the meta/message and meta/metrics endpoints") sat.transport = websocket.New() sat.server.GET("/meta/message", echo.WrapHandler(sat.transport.HTTPHandlerFunc())) sat.server.GET("/meta/metrics", sat.workerMetricsHandler()) } else { + logger.Info().Msg("controlplane url is not present, pass anything to sat.handler") // allow any HTTP method sat.server.Any("*", sat.handler(engine)) } @@ -170,7 +172,7 @@ func (s *Sat) setupBus() { opts := []bus.OptionsModifier{ bus.UseBelongsTo(s.config.Tenant), bus.UseInterests(s.config.JobType), - bus.UseLogger(s.logger), + bus.UseLogger(s.logger.With().Str("source", "sat.setupBus").Logger()), bus.UseMeshTransport(s.transport), bus.UseDiscovery(local.New()), bus.UseEndpoint(fmt.Sprintf("%d", s.config.Port), "/meta/message"), diff --git a/sat/sat/tracing.go b/sat/sat/tracing.go index b7cbb8f2..a995eadb 100644 --- a/sat/sat/tracing.go +++ b/sat/sat/tracing.go @@ -56,6 +56,7 @@ func SetupTracing(config options.TracerConfig, logger zerolog.Logger) (*trace.Tr conn, err := observability.GrpcConnection(ctx, config.Collector.Endpoint) if err != nil { + ll.Err(err).Msg("observability.GrcpConnection failed") return nil, errors.Wrap(err, "collector GrpcConnection") } @@ -64,10 +65,11 @@ func SetupTracing(config options.TracerConfig, logger zerolog.Logger) (*trace.Tr ServiceName: config.ServiceName, }) if err != nil { + ll.Err(err).Msg("observability.OtelTracer failed") return nil, errors.Wrap(err, "observability.OtelTracer") } - ll.Info().Msg("created collector trace exporter") + ll.Info().Msg("created collector sdkTrace exporter") return traceProvider, nil default: From fa1f247c0721e16c4d9d88c2b345f82364f646ed Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 12:30:31 +0100 Subject: [PATCH 14/21] Fix tracing in e2core --- e2core/command/mod_start.go | 5 +- e2core/options/options.go | 35 +++++- e2core/server/init.go | 60 ---------- e2core/server/server.go | 3 +- e2core/syncer/syncer.go | 2 +- .../bus/transport/websocket/transport.go | 1 + foundation/tracing/tracing.go | 110 ++++++++++++++++++ sat/main.go | 5 +- sat/sat/config.go | 29 ++++- sat/sat/handler.go | 5 +- sat/sat/meshed.go | 3 +- sat/sat/sat.go | 11 +- sat/sat/sat_test.go | 12 +- sat/sat/tracing.go | 89 -------------- 14 files changed, 194 insertions(+), 176 deletions(-) create mode 100644 foundation/tracing/tracing.go delete mode 100644 sat/sat/tracing.go diff --git a/e2core/command/mod_start.go b/e2core/command/mod_start.go index 4df9e2ee..d2e53860 100644 --- a/e2core/command/mod_start.go +++ b/e2core/command/mod_start.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/cobra" "github.com/suborbital/e2core/e2core/release" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" satOptions "github.com/suborbital/e2core/sat/sat/options" @@ -66,7 +67,7 @@ func ModStart() *cobra.Command { l.Info().Interface("sdkTrace-config", config.TracerConfig).Msg("this is the sdkTrace config we're using") - traceProvider, err := sat.SetupTracing(config.TracerConfig, l) + traceProvider, err := tracing.SetupTracing(config.TracerConfig, l) if err != nil { return errors.Wrap(err, "setup tracing") } @@ -83,7 +84,7 @@ func ModStart() *cobra.Command { defer traceProvider.Shutdown(context.Background()) - satInstance, err := sat.New(config, l, traceProvider, mtx) + satInstance, err := sat.New(config, l, mtx) if err != nil { return errors.Wrap(err, "failed to sat.New") } diff --git a/e2core/options/options.go b/e2core/options/options.go index f79b6ab3..5539cd84 100644 --- a/e2core/options/options.go +++ b/e2core/options/options.go @@ -7,6 +7,8 @@ import ( "github.com/pkg/errors" "github.com/sethvargo/go-envconfig" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -29,7 +31,8 @@ type Options struct { Domain string `env:"E2CORE_DOMAIN"` HTTPPort int `env:"E2CORE_HTTP_PORT,default=8080"` TLSPort int `env:"E2CORE_TLS_PORT,default=443"` - TracerConfig TracerConfig `env:",prefix=E2CORE_TRACER_"` + EnvTracerConfig TracerConfig `env:",prefix=E2CORE_TRACER_"` + TracerConfig tracing.Config } // TracerConfig holds values specific to setting up the tracer. It's only used in proxy mode. All configuration options @@ -149,11 +152,37 @@ func (o *Options) finalize() error { o.Features = envOpts.Features o.EnvironmentToken = "" - o.TracerConfig = TracerConfig{} + o.EnvTracerConfig = TracerConfig{} o.StaticPeers = envOpts.StaticPeers o.EnvironmentToken = envOpts.EnvironmentToken - o.TracerConfig = envOpts.TracerConfig + o.EnvTracerConfig = envOpts.EnvTracerConfig + + tc := tracing.Config{ + ServiceName: envOpts.EnvTracerConfig.ServiceName, + Probability: envOpts.EnvTracerConfig.Probability, + } + + switch envOpts.EnvTracerConfig.TracerType { + case "collector": + tc.Type = tracing.ExporterCollector + case "honeycomb": + tc.Type = tracing.ExporterHoneycomb + } + + if envOpts.EnvTracerConfig.HoneycombConfig != nil { + tc.Honeycomb = tracing.HoneycombConfig{ + Endpoint: envOpts.EnvTracerConfig.HoneycombConfig.Endpoint, + APIKey: envOpts.EnvTracerConfig.HoneycombConfig.APIKey, + Dataset: envOpts.EnvTracerConfig.HoneycombConfig.Dataset, + } + } + + if envOpts.EnvTracerConfig.Collector != nil { + tc.Collector = tracing.CollectorConfig{Endpoint: envOpts.EnvTracerConfig.Collector.Endpoint} + } + + o.TracerConfig = tc return nil } diff --git a/e2core/server/init.go b/e2core/server/init.go index a5605858..abb4e431 100644 --- a/e2core/server/init.go +++ b/e2core/server/init.go @@ -1,61 +1 @@ package server - -import ( - "context" - - "github.com/pkg/errors" - "github.com/rs/zerolog" - - "github.com/suborbital/e2core/e2core/options" - "github.com/suborbital/go-kit/observability" -) - -// setupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. -func setupTracing(config options.TracerConfig, logger zerolog.Logger) (func(ctx context.Context) error, error) { - l := logger.With().Str("function", "setupTracing").Logger() - emptyShutdown := func(_ context.Context) error { return nil } - - switch config.TracerType { - case "honeycomb": - conn, err := observability.GrpcConnection(context.Background(), config.HoneycombConfig.Endpoint) - if err != nil { - return emptyShutdown, errors.Wrapf(err, "observability.GrpcConnection to %s", config.HoneycombConfig.Endpoint) - } - - tp, err := observability.HoneycombTracer(context.Background(), conn, observability.HoneycombTracingConfig{ - TracingConfig: observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }, - APIKey: config.HoneycombConfig.APIKey, - Dataset: config.HoneycombConfig.Dataset, - }) - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.HoneycombTracer") - } - - return tp.Shutdown, nil - case "collector": - conn, err := observability.GrpcConnection(context.Background(), config.Collector.Endpoint) - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.GrpcConnection") - } - - tp, err := observability.OtelTracer(context.Background(), conn, observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }) - - return tp.Shutdown, nil - default: - l.Warn().Str("tracerType", config.TracerType).Msg("unrecognised tracer type configuration. Defaulting to no tracer") - fallthrough - case "none", "": - tp, err := observability.NoopTracer() - if err != nil { - return emptyShutdown, errors.Wrap(err, "observability.NoopTracer") - } - - return tp.Shutdown, nil - } -} diff --git a/e2core/server/server.go b/e2core/server/server.go index 7f8189d6..43d461b4 100644 --- a/e2core/server/server.go +++ b/e2core/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/bus/discovery/local" "github.com/suborbital/e2core/foundation/bus/transport/websocket" + "github.com/suborbital/e2core/foundation/tracing" kitError "github.com/suborbital/go-kit/web/error" "github.com/suborbital/go-kit/web/mid" ) @@ -40,7 +41,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server, // @todo https://github.com/suborbital/e2core/issues/144, the first return value is a function that would close the // tracer in case of a shutdown. Usually that is put in a defer statement. Server doesn't have a graceful shutdown. - _, err := setupTracing(opts.TracerConfig, ll) + _, err := tracing.SetupTracing(opts.TracerConfig, ll) if err != nil { return nil, errors.Wrapf(err, "setupTracing(%s, %s, %f)", "e2core", "reporter_uri", 0.04) } diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index 3420be46..aa60f3cd 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -36,7 +36,7 @@ type syncJob struct { // New creates a syncer with the given SystemSource func New(opts *options.Options, logger zerolog.Logger, source system.Source) *Syncer { s := &Syncer{ - sched: scheduler.New(), + sched: scheduler.NewWithLogger(logger), opts: opts, job: &syncJob{ systemSource: source, diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index dceb570c..6bfe801a 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) const ( diff --git a/foundation/tracing/tracing.go b/foundation/tracing/tracing.go new file mode 100644 index 00000000..41af70a2 --- /dev/null +++ b/foundation/tracing/tracing.go @@ -0,0 +1,110 @@ +package tracing + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/rs/zerolog" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/go-kit/observability" +) + +const ( + ExporterCollector CollectorType = "collector" + ExporterHoneycomb CollectorType = "honeycomb" +) + +type CollectorType string + +type Config struct { + Type CollectorType + ServiceName string + Probability float64 + Collector CollectorConfig + Honeycomb HoneycombConfig +} + +type CollectorConfig struct { + Endpoint string +} + +type HoneycombConfig struct { + Endpoint string + APIKey string + Dataset string +} + +var Tracer trace.Tracer + +// SetupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. +func SetupTracing(config Config, logger zerolog.Logger) (*sdkTrace.TracerProvider, error) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + var traceProvider *sdkTrace.TracerProvider + var err error + + ll := logger.With().Str("tracerType", string(config.Type)).Logger() + + switch config.Type { + case ExporterHoneycomb: + ll.Info().Msg("configuring honeycomb exporter for tracing") + + conn, err := observability.GrpcConnection(ctx, config.Honeycomb.Endpoint, nil) + if err != nil { + return nil, errors.Wrap(err, "honeycomb GrpcConnection") + } + + traceProvider, err = observability.HoneycombTracer(ctx, conn, observability.HoneycombTracingConfig{ + TracingConfig: observability.TracingConfig{ + Probability: config.Probability, + ServiceName: config.ServiceName, + }, + APIKey: config.Honeycomb.APIKey, + Dataset: config.Honeycomb.Dataset, + }) + if err != nil { + return nil, errors.Wrap(err, "observability.HoneycombTracer") + } + + ll.Info().Msg("created honeycomb sdkTrace exporter") + case ExporterCollector: + ll.Info().Msg("configuring collector exporter for tracing") + + conn, err := observability.GrpcConnection(ctx, config.Collector.Endpoint) + if err != nil { + ll.Err(err).Msg("observability.GrcpConnection failed") + return nil, errors.Wrap(err, "collector GrpcConnection") + } + + traceProvider, err = observability.OtelTracer(ctx, conn, observability.TracingConfig{ + Probability: config.Probability, + ServiceName: config.ServiceName, + }) + if err != nil { + ll.Err(err).Msg("observability.OtelTracer failed") + return nil, errors.Wrap(err, "observability.OtelTracer") + } + + ll.Info().Msg("created collector sdkTrace exporter") + default: + ll.Warn().Msg("unrecognised tracer type configuration. Defaulting to no tracer") + fallthrough + case "none", "": + // Create the most default sdkTrace provider and escape early. + traceProvider, err = observability.NoopTracer() + if err != nil { + return nil, errors.Wrap(err, "noop Tracer") + } + + ll.Info().Msg("finished setting up default noop tracer") + } + + ll.Info().Msg("setting up a global tracer") + Tracer = traceProvider.Tracer("e2core-bebby-tracing") + + return traceProvider, nil +} diff --git a/sat/main.go b/sat/main.go index 4c52600a..24aa77b3 100644 --- a/sat/main.go +++ b/sat/main.go @@ -12,6 +12,7 @@ import ( "github.com/rs/zerolog" "github.com/sethvargo/go-envconfig" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat" "github.com/suborbital/e2core/sat/sat/metrics" satOptions "github.com/suborbital/e2core/sat/sat/options" @@ -45,7 +46,7 @@ func main() { // start starts up the Sat instance func start(logger zerolog.Logger, conf *sat.Config) error { - traceProvider, err := sat.SetupTracing(conf.TracerConfig, logger) + traceProvider, err := tracing.SetupTracing(conf.TracerConfig, logger) if err != nil { return errors.Wrap(err, "setup tracing") } @@ -60,7 +61,7 @@ func start(logger zerolog.Logger, conf *sat.Config) error { } // initialize Reactr, echo, and Bus and wrap them in a sat instance. - s, err := sat.New(conf, logger, traceProvider, mtx) + s, err := sat.New(conf, logger, mtx) if err != nil { return errors.Wrap(err, "sat.New") } diff --git a/sat/sat/config.go b/sat/sat/config.go index 8f8ef492..a00f2841 100644 --- a/sat/sat/config.go +++ b/sat/sat/config.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "gopkg.in/yaml.v3" + "github.com/suborbital/e2core/foundation/tracing" satOptions "github.com/suborbital/e2core/sat/sat/options" "github.com/suborbital/systemspec/capabilities" "github.com/suborbital/systemspec/fqmn" @@ -33,7 +34,7 @@ type Config struct { ControlPlaneUrl string EnvToken string ProcUUID string - TracerConfig satOptions.TracerConfig + TracerConfig tracing.Config MetricsConfig satOptions.MetricsConfig } @@ -136,6 +137,30 @@ func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleA } } + tc := tracing.Config{ + ServiceName: opts.TracerConfig.ServiceName, + Probability: opts.TracerConfig.Probability, + } + + switch opts.TracerConfig.TracerType { + case "collector": + tc.Type = tracing.ExporterCollector + case "honeycomb": + tc.Type = tracing.ExporterHoneycomb + } + + if opts.TracerConfig.HoneycombConfig != nil { + tc.Honeycomb = tracing.HoneycombConfig{ + Endpoint: opts.TracerConfig.HoneycombConfig.Endpoint, + APIKey: opts.TracerConfig.HoneycombConfig.APIKey, + Dataset: opts.TracerConfig.HoneycombConfig.Dataset, + } + } + + if opts.TracerConfig.Collector != nil { + tc.Collector = tracing.CollectorConfig{Endpoint: opts.TracerConfig.Collector.Endpoint} + } + // finally, put it all together c := &Config{ ModuleArg: moduleArg, @@ -147,7 +172,7 @@ func ConfigFromModuleArg(logger zerolog.Logger, opts satOptions.Options, moduleA Connections: conns, Port: portInt, ControlPlaneUrl: controlPlane, - TracerConfig: opts.TracerConfig, + TracerConfig: tc, MetricsConfig: opts.MetricsConfig, ProcUUID: string(opts.ProcUUID), } diff --git a/sat/sat/handler.go b/sat/sat/handler.go index dbfef0ff..0b8e63a3 100644 --- a/sat/sat/handler.go +++ b/sat/sat/handler.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2" "github.com/suborbital/e2core/sat/sat/metrics" "github.com/suborbital/systemspec/request" @@ -17,8 +18,8 @@ import ( func (s *Sat) handler(engine *engine2.Engine) echo.HandlerFunc { return func(c echo.Context) error { - spanCtx, span := s.tracer.Start(c.Request().Context(), "echoHandler", trace.WithAttributes( - attribute.String("request_id", c.Request().Header.Get("requestID")), + spanCtx, span := tracing.Tracer.Start(c.Request().Context(), "echoHandler", trace.WithAttributes( + attribute.String("requestID", c.Response().Header().Get(echo.HeaderXRequestID)), )) defer span.End() diff --git a/sat/sat/meshed.go b/sat/sat/meshed.go index 39dade35..cca0b8d0 100644 --- a/sat/sat/meshed.go +++ b/sat/sat/meshed.go @@ -12,6 +12,7 @@ import ( "github.com/suborbital/e2core/e2core/server" "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" ) @@ -31,7 +32,7 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) { ctx := context.WithValue(context.Background(), "requestID", req.ID) - spanCtx, span := s.tracer.Start(ctx, "handleFnResult", trace.WithAttributes( + spanCtx, span := tracing.Tracer.Start(ctx, "handleFnResult", trace.WithAttributes( attribute.String("request_id", req.ID), )) defer span.End() diff --git a/sat/sat/sat.go b/sat/sat/sat.go index 12941dbd..af978c13 100644 --- a/sat/sat/sat.go +++ b/sat/sat/sat.go @@ -12,7 +12,7 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/pkg/errors" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/bus/discovery/local" @@ -34,13 +34,12 @@ type Sat struct { pod *bus.Pod transport *websocket.Transport engine *engine2.Engine - tracer trace.Tracer metrics metrics.Metrics } // New initializes a Sat instance // if traceProvider is nil, the default NoopTraceProvider will be used -func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvider, mtx metrics.Metrics) (*Sat, error) { +func New(config *Config, logger zerolog.Logger, mtx metrics.Metrics) (*Sat, error) { var module *tenant.WasmModuleRef if config.Module != nil && config.Module.WasmRef != nil && len(config.Module.WasmRef.Data) > 0 { @@ -61,20 +60,16 @@ func New(config *Config, logger zerolog.Logger, traceProvider trace.TracerProvid engine := engine2.New(config.JobType, module, engineAPI) - if traceProvider == nil { - traceProvider = trace.NewNoopTracerProvider() - } - sat := &Sat{ config: config, logger: logger, engine: engine, - tracer: traceProvider.Tracer("sat"), metrics: mtx, } sat.server = echo.New() sat.server.Use( + otelecho.Middleware("e2core-bebby"), middleware.Recover(), ) sat.server.HTTPErrorHandler = kitError.Handler(logger) diff --git a/sat/sat/sat_test.go b/sat/sat/sat_test.go index 7290df31..7c40d3d2 100644 --- a/sat/sat/sat_test.go +++ b/sat/sat/sat_test.go @@ -13,9 +13,11 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/sdk/trace" + sdkTrace "go.opentelemetry.io/otel/sdk/trace" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/sat/metrics" + "github.com/suborbital/e2core/sat/sat/options" ) func TestEchoRequest(t *testing.T) { @@ -96,18 +98,18 @@ func TestPanicRequest(t *testing.T) { `, string(body)) } -func satForFile(filepath string) (*Sat, *trace.TracerProvider, error) { - config, err := ConfigFromModuleArg(zerolog.Nop(), filepath) +func satForFile(filepath string) (*Sat, *sdkTrace.TracerProvider, error) { + config, err := ConfigFromModuleArg(zerolog.Nop(), options.Options{}, filepath) if err != nil { return nil, nil, err } - traceProvider, err := SetupTracing(config.TracerConfig, zerolog.Nop()) + traceProvider, err := tracing.SetupTracing(tracing.Config{}, zerolog.Nop()) if err != nil { return nil, nil, errors.Wrap(err, "setup tracing") } - sat, err := New(config, zerolog.Nop(), traceProvider, metrics.SetupNoopMetrics()) + sat, err := New(config, zerolog.Nop(), metrics.SetupNoopMetrics()) if err != nil { return nil, nil, err } diff --git a/sat/sat/tracing.go b/sat/sat/tracing.go deleted file mode 100644 index a995eadb..00000000 --- a/sat/sat/tracing.go +++ /dev/null @@ -1,89 +0,0 @@ -package sat - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/rs/zerolog" - "go.opentelemetry.io/otel/sdk/trace" - - "github.com/suborbital/e2core/sat/sat/options" - "github.com/suborbital/go-kit/observability" -) - -// SetupTracing configure open telemetry to be used with otel exporter. Returns a tracer closer func and an error. -func SetupTracing(config options.TracerConfig, logger zerolog.Logger) (*trace.TracerProvider, error) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - ll := logger.With().Str("tracerType", config.TracerType).Logger() - - switch config.TracerType { - case "honeycomb": - if config.HoneycombConfig == nil { - return nil, errors.New("missing honeycomb tracing config values") - } - - ll.Info().Msg("configuring honeycomb exporter for tracing") - - conn, err := observability.GrpcConnection(ctx, config.HoneycombConfig.Endpoint, nil) - if err != nil { - return nil, errors.Wrap(err, "honeycomb GrpcConnection") - } - - traceProvider, err := observability.HoneycombTracer(ctx, conn, observability.HoneycombTracingConfig{ - TracingConfig: observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }, - APIKey: config.HoneycombConfig.APIKey, - Dataset: config.HoneycombConfig.Dataset, - }) - if err != nil { - return nil, errors.Wrap(err, "observability.HoneycombTracer") - } - - ll.Info().Msg("created honeycomb trace exporter") - - return traceProvider, nil - case "collector": - if config.Collector == nil { - return nil, errors.New("missing collector tracing config values") - } - - ll.Info().Msg("configuring collector exporter for tracing") - - conn, err := observability.GrpcConnection(ctx, config.Collector.Endpoint) - if err != nil { - ll.Err(err).Msg("observability.GrcpConnection failed") - return nil, errors.Wrap(err, "collector GrpcConnection") - } - - traceProvider, err := observability.OtelTracer(ctx, conn, observability.TracingConfig{ - Probability: config.Probability, - ServiceName: config.ServiceName, - }) - if err != nil { - ll.Err(err).Msg("observability.OtelTracer failed") - return nil, errors.Wrap(err, "observability.OtelTracer") - } - - ll.Info().Msg("created collector sdkTrace exporter") - - return traceProvider, nil - default: - ll.Warn().Msg("unrecognised tracer type configuration. Defaulting to no tracer") - fallthrough - case "none", "": - // Create the most default trace provider and escape early. - traceProvider, err := observability.NoopTracer() - if err != nil { - return nil, errors.Wrap(err, "noop Tracer") - } - - ll.Debug().Msg("finished setting up default noop tracer") - - return traceProvider, nil - } -} From a98dda7998eace76e3098c88d04f60f77a495b21 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 12:30:55 +0100 Subject: [PATCH 15/21] Add spans to websocket transport handlerfunc --- foundation/bus/transport/websocket/transport.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index 6bfe801a..aa8bd650 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -88,6 +88,11 @@ func (t *Transport) Connect(endpoint string) (bus.Connection, error) { // HTTPHandlerFunc returns an http.HandlerFunc for incoming connections func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + ctx, span := tracing.Tracer.Start(r.Context(), "websocket.transport.httphanderfunc") + defer span.End() + + r = r.Clone(ctx) + if t.connectionFunc == nil { t.log.Error().Msg("incoming connection received, but no connFunc configured") w.WriteHeader(http.StatusInternalServerError) @@ -96,6 +101,7 @@ func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { t.log.Info().Msg("receiving a message I think") + span.AddEvent("upgrading request to websocket connection") c, err := upgrader.Upgrade(w, r, nil) if err != nil { t.log.Err(err).Msg("could not upgrade connection to websocket") @@ -111,6 +117,7 @@ func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { t.log.Info().Interface("connectionfunc", t.connectionFunc).Msg("connection func is this, apparently, bus.Connect, again? request is in the conn.conn as an upgraded websocket connection") + span.AddEvent("calling connection function") t.connectionFunc(conn) } } From 1d2c4765c5b9e2882cb90cce214022ca0dc84d1f Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Mon, 15 May 2023 16:03:06 +0100 Subject: [PATCH 16/21] Add tracing to most of e2core --- e2core/server/dispatcher.go | 30 +++++++++--- e2core/server/handlers.go | 17 ++++++- foundation/bus/bus/bus.go | 13 ++++- foundation/bus/bus/connectionhandler.go | 9 +++- foundation/bus/bus/hub.go | 65 ++++++++++++++++--------- foundation/bus/bus/message.go | 26 +++++++--- foundation/bus/bus/messagebus.go | 10 ++++ foundation/bus/bus/pod.go | 11 +++++ foundation/bus/bus/podconnection.go | 19 ++++++-- foundation/scheduler/core.go | 14 ++++-- foundation/scheduler/scheduler.go | 16 +++++- sat/engine2/engine.go | 5 +- sat/sat/meshed.go | 8 +-- sat/sat/sat.go | 2 +- 14 files changed, 194 insertions(+), 51 deletions(-) diff --git a/e2core/server/dispatcher.go b/e2core/server/dispatcher.go index ff4d1407..3d14098b 100644 --- a/e2core/server/dispatcher.go +++ b/e2core/server/dispatcher.go @@ -1,6 +1,7 @@ package server import ( + "context" "encoding/json" "fmt" "sync" @@ -11,6 +12,7 @@ import ( "github.com/suborbital/e2core/e2core/sequence" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -55,7 +57,10 @@ func newDispatcher(l zerolog.Logger, pod *bus.Pod) *dispatcher { // Execute returns the "final state" of a Sequence. If the state's err is not nil, it means a runnable returned an error, and the Directive indicates the Sequence should return. // if exec itself actually returns an error other than ErrSequenceRunErr, it means there was a problem executing the Sequence as described, and should be treated as such. -func (d *dispatcher) Execute(seq *sequence.Sequence) error { +func (d *dispatcher) Execute(ctx context.Context, seq *sequence.Sequence) error { + ctx, span := tracing.Tracer.Start(ctx, "dispatcher.execute") + defer span.End() + ll := d.log.With().Str("requestID", seq.ParentID()).Logger() ll.Info().Interface("dispatcher-pod", d.pod).Msg("created a sequence dispatcher") s := &sequenceDispatcher{ @@ -83,7 +88,8 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { ll.Info().Interface("first-step", firstStep).Msg("dispatchsingle gets called on the sequence dispatcher. Arguments are the results channel and the first step.") - if err := s.dispatchSingle(firstStep, resultChan); err != nil { + span.AddEvent("dispatching single") + if err := s.dispatchSingle(ctx, firstStep, resultChan); err != nil { return errors.Wrap(err, "failed to dispatchSingle") } @@ -96,7 +102,7 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { if step == nil { break } else if step.IsSingle() { - if err := s.awaitResult(resultChan); err != nil { + if err := s.awaitResult(ctx, resultChan); err != nil { return errors.Wrap(err, "failed to awaitResult") } } else if step.IsGroup() { @@ -108,7 +114,10 @@ func (d *dispatcher) Execute(seq *sequence.Sequence) error { } // dispatchSingle executes a single plugin from a sequence step -func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan *sequence.ExecResult) error { +func (s *sequenceDispatcher) dispatchSingle(ctx context.Context, step *sequence.Step, resultChan chan *sequence.ExecResult) error { + ctx, span := tracing.Tracer.Start(ctx, "sequencedispatcher.dispatchsingle") + defer span.End() + data, err := s.seq.Request().ToJSON() if err != nil { return errors.Wrap(err, "failed to req.toJSON") @@ -116,7 +125,9 @@ func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan s.log.Info().Str("data in dispatchSingle", string(data)).Msg("message about to be sent") + span.AddEvent("created new message with parent id") msg := bus.NewMsgWithParentID(step.FQMN, s.seq.ParentID(), data) + msg.SetContext(ctx) s.log.Info().Interface("bus.Message", msg).Msg("bus msg. Next is pod.tunnel with step.fqmn with message.") @@ -129,12 +140,17 @@ func (s *sequenceDispatcher) dispatchSingle(step *sequence.Step, resultChan chan Str("msgUUID", msg.UUID()). Msg("dispatched execution for parent to peer with message") - return s.awaitResult(resultChan) + return s.awaitResult(ctx, resultChan) } -func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) error { +func (s *sequenceDispatcher) awaitResult(ctx context.Context, resultChan chan *sequence.ExecResult) error { + ctx, span := tracing.Tracer.Start(ctx, "awaitResult") + defer span.End() + select { case result := <-resultChan: + span.AddEvent("result came in the channel") + s.log.Info().Msg("we have a message back from the result channel") if result.Response == nil { s.log.Error().Msg("sadly the response was nil") @@ -147,6 +163,8 @@ func (s *sequenceDispatcher) awaitResult(resultChan chan *sequence.ExecResult) e return errors.Wrap(err, "failed to HandleStepResults") } case <-time.After(time.Second * 10): + span.AddEvent("10 seconds have passed, sad times") + s.log.Warn().Msg("dispatchSingle timeout reached") return ErrDispatchTimeout } diff --git a/e2core/server/handlers.go b/e2core/server/handlers.go index 73a8f34b..6a4c54c8 100644 --- a/e2core/server/handlers.go +++ b/e2core/server/handlers.go @@ -5,14 +5,22 @@ import ( "net/http" "github.com/labstack/echo/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/e2core/sequence" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" "github.com/suborbital/systemspec/tenant" ) func (s *Server) executePluginByNameHandler() echo.HandlerFunc { return func(c echo.Context) error { + ctx, span := tracing.Tracer.Start(c.Request().Context(), "executePluginByNameHandler") + defer span.End() + + c.SetRequest(c.Request().WithContext(ctx)) + // with the authorization middleware, this is going to be the uuid of the tenant specified by the path name in // the environment specified by the authorization token. ident := ReadParam(c, "ident") @@ -30,6 +38,11 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc { Str("fn", name). Logger() + span.AddEvent("grabbing module by name", trace.WithAttributes( + attribute.String("ident", ident), + attribute.String("namespace", namespace), + attribute.String("name", name), + )) mod := s.syncer.GetModuleByName(ident, namespace, name) if mod == nil { ll.Error().Msg("syncer did not find module by these details") @@ -52,13 +65,15 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc { steps := []tenant.WorkflowStep{{FQMN: mod.FQMN}} + span.AddEvent("sequence.New from req") + // a sequence executes the handler's steps and manages its state. seq, err := sequence.New(steps, req) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "failed to handle request").SetInternal(err) } - if err := s.dispatcher.Execute(seq); err != nil { + if err := s.dispatcher.Execute(c.Request().Context(), seq); err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "failed to execute plugin").SetInternal(err) } diff --git a/foundation/bus/bus/bus.go b/foundation/bus/bus/bus.go index 9223e027..8a44bf9b 100644 --- a/foundation/bus/bus/bus.go +++ b/foundation/bus/bus/bus.go @@ -4,6 +4,10 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) // ErrTransportNotConfigured represent package-level vars @@ -71,8 +75,15 @@ func (b *Bus) ConnectBridgeTopic(topic string) error { // This bypasses the main Bus bus, which is why it isn't a method on Pod. // Messages are load balanced between the connections that advertise the capability in question. func (b *Bus) Tunnel(capability string, msg Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "bus.Tunnel", trace.WithAttributes( + attribute.String("capability", capability), + )) + defer span.End() + + msg.SetContext(ctx) + b.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("bus.Tunnel is happening (pod tunnelfunc?)") - return b.hub.sendTunneledMessage(capability, msg) + return b.hub.sendTunneledMessage(ctx, capability, msg) } // Withdraw cancels discovery, sends withdraw messages to all peers, diff --git a/foundation/bus/bus/connectionhandler.go b/foundation/bus/bus/connectionhandler.go index e5c0524a..e0402609 100644 --- a/foundation/bus/bus/connectionhandler.go +++ b/foundation/bus/bus/connectionhandler.go @@ -1,10 +1,13 @@ package bus import ( + "context" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/suborbital/e2core/foundation/bus/bus/withdraw" + "github.com/suborbital/e2core/foundation/tracing" ) type connectionHandler struct { @@ -68,9 +71,13 @@ func (c *connectionHandler) Start() { }() } -func (c *connectionHandler) Send(msg Message) error { +func (c *connectionHandler) Send(ctx context.Context, msg Message) error { + ctx, span := tracing.Tracer.Start(ctx, "connectionHandler.send") + defer span.End() + ll := c.Log.With().Str("requestID", msg.ParentID()).Logger() if c.Signaler.PeerWithdrawn() { + span.AddEvent("peer withdrawn") return ErrNodeWithdrawn } diff --git a/foundation/bus/bus/hub.go b/foundation/bus/bus/hub.go index dcb13e45..54acd788 100644 --- a/foundation/bus/bus/hub.go +++ b/foundation/bus/bus/hub.go @@ -1,14 +1,18 @@ package bus import ( + "context" "sync" "time" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus/tunnel" "github.com/suborbital/e2core/foundation/bus/bus/withdraw" + "github.com/suborbital/e2core/foundation/tracing" ) const tunnelRetryCount = 32 @@ -106,6 +110,11 @@ func initHub(nodeUUID string, options *Options, connectFunc func() *Pod) *hub { // messageHandler takes each message coming from the bus and sends it to currently active mesh connections func (h *hub) messageHandler(msg Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "hub messagehandler") + defer span.End() + + msg.SetContext(ctx) + h.lock.RLock() defer h.lock.RUnlock() @@ -120,7 +129,7 @@ func (h *hub) messageHandler(msg Message) error { Msg("sending the message to the handler at this uuid") handler := h.meshConnections[uuid] - err := handler.Send(msg) + err := handler.Send(ctx, msg) if err != nil { ll.Err(err).Str("meshconnection-uuid", uuid). Msg("send returned an error") @@ -403,7 +412,10 @@ func (h *hub) scanFailedMeshConnections() { } } -func (h *hub) sendTunneledMessage(capability string, msg Message) error { +func (h *hub) sendTunneledMessage(ctx context.Context, capability string, msg Message) error { + ctx, span := tracing.Tracer.Start(ctx, "hub.sendtunneledmessage") + defer span.End() + ll := h.log.With().Str("method", "sendTunneledMessage"). Str("requestID", msg.ParentID()).Logger() @@ -418,34 +430,43 @@ func (h *hub) sendTunneledMessage(capability string, msg Message) error { ll.Info().Int("tunnel-retry-count", tunnelRetryCount).Msg("starting iteration to check whether we can send a message to someplace") - // iterate a reasonable number of times to find a connection that's not removed or dead - for i := 0; i < tunnelRetryCount; i++ { + handlerFactory := func(ctx context.Context) (*connectionHandler, error) { + ctx, span := tracing.Tracer.Start(ctx, "handlerFactory") + defer span.End() - // wrap this in a function to avoid any sloppy mutex issues - handler, err := func() (*connectionHandler, error) { - h.lock.RLock() - defer h.lock.RUnlock() - - uuid := balancer.Next() - ll.Info().Int("iteration", i).Str("uuid", uuid).Msg("balancer next") - if uuid == "" { - return nil, ErrTunnelNotEstablished - } + h.lock.RLock() + defer h.lock.RUnlock() - handler, exists := h.meshConnections[uuid] - ll.Info().Bool("handler-exists", exists).Msg("hub has a meshconnections map") - if !exists { - return nil, ErrTunnelNotEstablished - } + uuid := balancer.Next() + if uuid == "" { + span.AddEvent("balancer doesn't exit") + return nil, ErrTunnelNotEstablished + } - return handler, nil - }() + handler, exists := h.meshConnections[uuid] + if !exists { + span.AddEvent("handler doesn't exist for uuid", trace.WithAttributes( + attribute.String("uuid", uuid), + )) + return nil, ErrTunnelNotEstablished + } + + span.AddEvent("returning a handler for uuid", trace.WithAttributes( + attribute.String("uuid", uuid), + )) + return handler, nil + } + + // iterate a reasonable number of times to find a connection that's not removed or dead + for i := 0; i < tunnelRetryCount; i++ { + // wrap this in a function to avoid any sloppy mutex issues + handler, err := handlerFactory(ctx) if err != nil { continue } if handler.Conn != nil { - if err := handler.Send(msg); err != nil { + if err := handler.Send(ctx, msg); err != nil { ll.Err(err).Msg("failed to SendMsg on tunneled connection, will remove") return errors.Wrap(err, "handler.Send died") } else { diff --git a/foundation/bus/bus/message.go b/foundation/bus/bus/message.go index a42982a2..8a844dd0 100644 --- a/foundation/bus/bus/message.go +++ b/foundation/bus/bus/message.go @@ -1,6 +1,7 @@ package bus import ( + "context" "encoding/json" "io" "net/http" @@ -23,19 +24,19 @@ type MsgChan chan Message // Message represents a message type Message interface { - // Unique ID for this message + // UUID is the unique ID for this message UUID() string - // ID of the parent event or request, such as HTTP request + // ParentID is the request ID of the parent event or request, such as HTTP request ParentID() string - // The UUID of the message being replied to, if any + // ReplyTo is the UUID of the message being replied to, if any ReplyTo() string - // Allow setting a message UUID that this message is a response to + // SetReplyTo allows setting a message UUID that this message is a response to SetReplyTo(string) // Type of message (application-specific) Type() string - // Time the message was sent + // Timestamp returns the time the message was sent Timestamp() time.Time - // Raw data of message + // Data returns raw data of message Data() []byte // Marshal the message itself to encoded bytes (JSON or otherwise) Marshal() ([]byte, error) @@ -45,6 +46,10 @@ type Message interface { MarshalMetadata() ([]byte, error) // UnmarshalMetadata encoded metadata into object UnmarshalMetadata([]byte) error + // Context will return the embedded context + Context() context.Context + // SetContext will set the new context on the message + SetContext(ctx context.Context) } // NewMsg creates a new Message with the built-in `_message` type @@ -128,6 +133,15 @@ func newMessage(msgType, parentID string, data []byte) Message { type _message struct { Meta _meta `json:"meta"` Payload _payload `json:"payload"` + ctx context.Context +} + +func (m *_message) SetContext(ctx context.Context) { + m.ctx = ctx +} + +func (m *_message) Context() context.Context { + return m.ctx } type _meta struct { diff --git a/foundation/bus/bus/messagebus.go b/foundation/bus/bus/messagebus.go index 96057198..9d2018a6 100644 --- a/foundation/bus/bus/messagebus.go +++ b/foundation/bus/bus/messagebus.go @@ -1,5 +1,9 @@ package bus +import ( + "github.com/suborbital/e2core/foundation/tracing" +) + const ( defaultBusChanSize = 256 ) @@ -38,6 +42,7 @@ func (b *messageBus) start() { // each connection until landing back at the beginning of the // ring, and repeat forever when each new message arrives for msg := range b.busChan { + for { // make sure the next pod is ready for messages if err := b.pool.prepareNext(b.buffer); err == nil { @@ -55,6 +60,11 @@ func (b *messageBus) start() { } func (b *messageBus) traverse(msg Message, start *podConnection) { + ctx, span := tracing.Tracer.Start(msg.Context(), "messagebus.traverse") + defer span.End() + + msg.SetContext(ctx) + startID := start.ID conn := start diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 7c9783cd..2a3b1ae6 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -6,6 +6,10 @@ import ( "sync/atomic" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -122,6 +126,13 @@ func (p *Pod) Send(msg Message) *MsgReceipt { // Tunnel bypasses the pod's normal 'Send' and uses the bus itself to tunnel to a specific peer // if a transport is enabled. If not, it's a no-op. func (p *Pod) Tunnel(capability string, msg Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "pod.Tunnel", trace.WithAttributes( + attribute.String("capability", capability), + )) + defer span.End() + + msg.SetContext(ctx) + p.logger.Info().Str("requestID", msg.ParentID()).Str("fqmn", capability).Msg("tunneling using the pod's tunnelfunc") return p.tunnelFunc(capability, msg) } diff --git a/foundation/bus/bus/podconnection.go b/foundation/bus/bus/podconnection.go index 8646aa2d..4b3f5c5a 100644 --- a/foundation/bus/bus/podconnection.go +++ b/foundation/bus/bus/podconnection.go @@ -1,6 +1,11 @@ package bus -import "sync" +import ( + "context" + "sync" + + "github.com/suborbital/e2core/foundation/tracing" +) // podConnection is a connection to a pod via its messageChan // podConnection is also a circular linked list/ring of connections @@ -47,7 +52,15 @@ func newPodConnection(id int64, pod *Pod) *podConnection { // ordering to the messageChan if it becomes full is not guaranteed, this // is sacrificed to ensure that the bus does not block because of a delinquient pod func (p *podConnection) send(msg Message) { - go func() { + ctx, span := tracing.Tracer.Start(msg.Context(), "podconnection.send") + defer span.End() + + msg.SetContext(ctx) + + go func(gctx context.Context) { + _, gspan := tracing.Tracer.Start(gctx, "go func inside podconnection.send") + defer gspan.End() + p.lock.RLock() defer p.lock.RUnlock() @@ -57,7 +70,7 @@ func (p *podConnection) send(msg Message) { } p.messageChan <- msg - }() + }(ctx) } // checkStatus checks the pod's feedback for any information or failed messages and drains the failures into the failed Message buffer diff --git a/foundation/scheduler/core.go b/foundation/scheduler/core.go index 7c45e1d4..0e27531f 100644 --- a/foundation/scheduler/core.go +++ b/foundation/scheduler/core.go @@ -39,7 +39,14 @@ func newCore(log zerolog.Logger) *core { func (c *core) do(job *Job) *Result { result := newResult(job.UUID()) - c.log.Info().Msg("core.do function got called") + rid := "no-request" + if job.Req() != nil { + rid = job.Req().ID + } + + ll := c.log.With().Str("requestID", rid).Logger() + + ll.Info().Msg("core.do function got called") jobWorker := c.scaler.findWorker(job.jobType) if jobWorker == nil { @@ -49,11 +56,12 @@ func (c *core) do(job *Job) *Result { go func() { job.result = result - c.log.Info().Msg("jobworker got a job scheduled") + ll.Info().Msg("jobworker got a job scheduled") + jobWorker.schedule(job) }() - c.log.Info().Msg("returning result from core.do func") + ll.Info().Msg("returning result from core.do func") return result } diff --git a/foundation/scheduler/scheduler.go b/foundation/scheduler/scheduler.go index ce1188f7..a984e6fb 100644 --- a/foundation/scheduler/scheduler.go +++ b/foundation/scheduler/scheduler.go @@ -50,7 +50,15 @@ func NewWithLogger(log zerolog.Logger) *Scheduler { // Do schedules a job to be worked on and returns a result object func (r *Scheduler) Do(job Job) *Result { - r.log.Info().Msg("scheduler.Do function got called, passing it on to core.do") + if job.Req() == nil { + r.log.Info(). + Str("requestID", "no-request"). + Msg("scheduler.Do function got called, passing it on to core.do") + } else { + r.log.Info(). + Str("requestID", job.Req().ID). + Msg("scheduler.Do function got called, passing it on to core.do") + } return r.core.do(&job) } @@ -136,7 +144,11 @@ func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Mess // each time a message is received with the associated type, // execute the associated job and pass the result to `run` pod.OnType(msgType, func(msg bus.Message) error { - r.log.Info().Str("msgType", msgType).Msg("scheduler.ListenAndRun called, msg turned into a job, and job passed to scheduler.Do function") + + r.log.Info(). + Str("msgType", msgType). + Str("requestID", msg.ParentID()). + Msg("scheduler.ListenAndRun called, msg turned into a job, and job passed to scheduler.Do function") result, err := helper(msg.Data()).Then() diff --git a/sat/engine2/engine.go b/sat/engine2/engine.go index 513bab5b..b6db45b9 100644 --- a/sat/engine2/engine.go +++ b/sat/engine2/engine.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/suborbital/e2core/foundation/scheduler" "github.com/suborbital/e2core/sat/engine2/api" @@ -19,9 +20,9 @@ type Engine struct { } // New creates a new Engine with the default API -func New(name string, ref *tenant.WasmModuleRef, api api.HostAPI) *Engine { +func New(name string, ref *tenant.WasmModuleRef, api api.HostAPI, logger zerolog.Logger) *Engine { e := &Engine{ - Scheduler: scheduler.New(), + Scheduler: scheduler.NewWithLogger(logger.With().Str("component", "engine.scheduler").Logger()), } runner := newRunnerFromRef(ref, api) diff --git a/sat/sat/meshed.go b/sat/sat/meshed.go index cca0b8d0..4d803dc9 100644 --- a/sat/sat/meshed.go +++ b/sat/sat/meshed.go @@ -147,11 +147,13 @@ func (s *Sat) sendFnResult(result *sequence.ExecResult, ctx context.Context) err } func (s *Sat) sendNextStep(msg bus.Message, seq *sequence.Sequence, req *request.CoordinatedRequest, ctx context.Context) { - ll := s.logger.With().Str("method", "sendNextStep").Str("requestID", msg.ParentID()).Logger() - - span := trace.SpanFromContext(ctx) + ctx, span := tracing.Tracer.Start(msg.Context(), "sat.sendNextStep") defer span.End() + msg.SetContext(ctx) + + ll := s.logger.With().Str("method", "sendNextStep").Str("requestID", msg.ParentID()).Logger() + nextStep := seq.NextStep() if nextStep == nil { ll.Debug().Msg("sequence completed, no nextStep message to send") diff --git a/sat/sat/sat.go b/sat/sat/sat.go index af978c13..fdbe0289 100644 --- a/sat/sat/sat.go +++ b/sat/sat/sat.go @@ -58,7 +58,7 @@ func New(config *Config, logger zerolog.Logger, mtx metrics.Metrics) (*Sat, erro return nil, errors.Wrap(err, "failed to NewWithConfig") } - engine := engine2.New(config.JobType, module, engineAPI) + engine := engine2.New(config.JobType, module, engineAPI, logger) sat := &Sat{ config: config, From b31f994a36bcf6780237e01417aed75fd95ca150 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Tue, 16 May 2023 10:48:25 +0100 Subject: [PATCH 17/21] Add request ID to main trace in handler func --- e2core/server/handlers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/e2core/server/handlers.go b/e2core/server/handlers.go index 6a4c54c8..788c0245 100644 --- a/e2core/server/handlers.go +++ b/e2core/server/handlers.go @@ -16,7 +16,9 @@ import ( func (s *Server) executePluginByNameHandler() echo.HandlerFunc { return func(c echo.Context) error { - ctx, span := tracing.Tracer.Start(c.Request().Context(), "executePluginByNameHandler") + ctx, span := tracing.Tracer.Start(c.Request().Context(), "executePluginByNameHandler", trace.WithAttributes( + attribute.String("request_id", c.Response().Header().Get(echo.HeaderXRequestID)), + )) defer span.End() c.SetRequest(c.Request().WithContext(ctx)) From 4fd8d7e9cd678fe1206f390a032ad8d207160688 Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Tue, 16 May 2023 10:50:27 +0100 Subject: [PATCH 18/21] Add a way to propagate trace data between service boundaries on message --- foundation/bus/bus/connectionhandler.go | 8 ++++ foundation/bus/bus/message.go | 45 ++++++++++++++++--- .../bus/transport/websocket/transport.go | 15 +++++++ foundation/tracing/tracing.go | 2 + 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/foundation/bus/bus/connectionhandler.go b/foundation/bus/bus/connectionhandler.go index e0402609..5a0f6007 100644 --- a/foundation/bus/bus/connectionhandler.go +++ b/foundation/bus/bus/connectionhandler.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" "github.com/suborbital/e2core/foundation/bus/bus/withdraw" "github.com/suborbital/e2core/foundation/tracing" @@ -64,9 +65,16 @@ func (c *connectionHandler) Start() { return } + ctx := otel.GetTextMapPropagator().Extract(context.Background(), msg) + ctx, span := tracing.Tracer.Start(ctx, "connectionHandler.ReadMsg") + + msg.SetContext(ctx) + ll.Debug().Str("messageUUID", msg.UUID()).Str("requestID", msg.ParentID()).Msg("received message") c.Pod.Send(msg) + + span.End() } }() } diff --git a/foundation/bus/bus/message.go b/foundation/bus/bus/message.go index 8a844dd0..1e0c03f8 100644 --- a/foundation/bus/bus/message.go +++ b/foundation/bus/bus/message.go @@ -3,11 +3,13 @@ package bus import ( "context" "encoding/json" + "fmt" "io" "net/http" "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/propagation" ) // MsgTypeDefault and other represent message consts @@ -24,6 +26,8 @@ type MsgChan chan Message // Message represents a message type Message interface { + propagation.TextMapCarrier + // UUID is the unique ID for this message UUID() string // ParentID is the request ID of the parent event or request, such as HTTP request @@ -122,6 +126,7 @@ func newMessage(msgType, parentID string, data []byte) Message { Payload: _payload{ Data: data, }, + TraceInfo: make(map[string]string), } return m @@ -131,19 +136,37 @@ func newMessage(msgType, parentID string, data []byte) Message { // most applications should define their own data structure // that implements the interface type _message struct { - Meta _meta `json:"meta"` - Payload _payload `json:"payload"` - ctx context.Context + Meta _meta `json:"meta"` + Payload _payload `json:"payload"` + ctx context.Context + TraceInfo map[string]string `json:"trace_info"` } -func (m *_message) SetContext(ctx context.Context) { - m.ctx = ctx +var _ Message = &_message{} + +func (m *_message) Get(key string) string { + fmt.Printf("\n\ngetting '%s' from message traceinfo\n\n", key) + return (*m).TraceInfo[key] } -func (m *_message) Context() context.Context { - return m.ctx +func (m *_message) Set(key string, value string) { + fmt.Printf("\n\nsetting '%s' to '%s' in message traceinfo\n\n", key, value) + (*m).TraceInfo[key] = value +} + +func (m *_message) Keys() []string { + keys := make([]string, 0, len(m.TraceInfo)) + for k := range m.TraceInfo { + keys = append(keys, k) + } + + fmt.Printf("\n\ngetting keys from message and they are '%v'\n\n", keys) + + return keys } +var _ Message = &_message{} + type _meta struct { UUID string `json:"uuid"` ParentID string `json:"parent_id"` @@ -210,3 +233,11 @@ func (m *_message) MarshalMetadata() ([]byte, error) { func (m *_message) UnmarshalMetadata(bytes []byte) error { return json.Unmarshal(bytes, &m.Meta) } + +func (m *_message) SetContext(ctx context.Context) { + m.ctx = ctx +} + +func (m *_message) Context() context.Context { + return m.ctx +} diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index aa8bd650..8042140e 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -11,6 +11,9 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus" "github.com/suborbital/e2core/foundation/tracing" @@ -124,10 +127,22 @@ func (t *Transport) HTTPHandlerFunc() http.HandlerFunc { // SendMsg sends a message to the connection func (c *Conn) SendMsg(msg bus.Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "conn.SendMsg", trace.WithAttributes( + attribute.String("request ID", msg.ParentID()), + )) + defer span.End() + + span.AddEvent("injecting the ctx into the message") + fmt.Printf("\n\n!!!!\n\ninjecting context into message\n") + otel.GetTextMapPropagator().Inject(ctx, msg) + fmt.Printf("\n\n---\n\ndone injecting context into message\n") + ll := c.log.With().Str("requestID", msg.ParentID()). Str("msg-uuid", msg.UUID()). Str("node-uuid", c.nodeUUID).Logger() + ll.Info().Strs("traceinfo-keys", msg.Keys()).Msg("uh what") + msgBytes, err := msg.Marshal() if err != nil { return errors.Wrap(err, "[transport-websocket] failed to Marshal message") diff --git a/foundation/tracing/tracing.go b/foundation/tracing/tracing.go index 41af70a2..356dd62e 100644 --- a/foundation/tracing/tracing.go +++ b/foundation/tracing/tracing.go @@ -106,5 +106,7 @@ func SetupTracing(config Config, logger zerolog.Logger) (*sdkTrace.TracerProvide ll.Info().Msg("setting up a global tracer") Tracer = traceProvider.Tracer("e2core-bebby-tracing") + otel.SetTextMapPropagator(propagation.TraceContext{}) + return traceProvider, nil } From 6dbb5149e55755785856a5c4330306d9e6b6c7ed Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Tue, 16 May 2023 10:50:58 +0100 Subject: [PATCH 19/21] Add contexts and start spans to connect the dots --- e2core/syncer/syncer.go | 3 +- foundation/bus/bus/connectionhandler.go | 2 ++ foundation/bus/bus/pod.go | 6 ++++ foundation/bus/bus/podconnection.go | 10 +++--- .../bus/transport/websocket/transport.go | 2 +- foundation/scheduler/core.go | 26 +++++++++++--- foundation/scheduler/ctx.go | 2 +- foundation/scheduler/scheduler.go | 35 ++++++++++++++----- foundation/scheduler/watcher.go | 7 ++-- foundation/scheduler/worker.go | 16 +++++++-- foundation/tracing/tracing.go | 2 ++ sat/sat/embedded.go | 9 +++-- sat/sat/handler.go | 2 +- 13 files changed, 93 insertions(+), 29 deletions(-) diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index aa60f3cd..73d695dd 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -1,6 +1,7 @@ package syncer import ( + "context" "sync" "github.com/pkg/errors" @@ -61,7 +62,7 @@ func (s *Syncer) Start() error { } // sync once to seed the initial state - if _, err := s.sched.Do(scheduler.NewJob("sync", nil)).Then(); err != nil { + if _, err := s.sched.Do(context.Background(), scheduler.NewJob("sync", nil)).Then(); err != nil { return errors.Wrap(err, "failed to Do sync job") } diff --git a/foundation/bus/bus/connectionhandler.go b/foundation/bus/bus/connectionhandler.go index 5a0f6007..229f1036 100644 --- a/foundation/bus/bus/connectionhandler.go +++ b/foundation/bus/bus/connectionhandler.go @@ -83,6 +83,8 @@ func (c *connectionHandler) Send(ctx context.Context, msg Message) error { ctx, span := tracing.Tracer.Start(ctx, "connectionHandler.send") defer span.End() + msg.SetContext(ctx) + ll := c.Log.With().Str("requestID", msg.ParentID()).Logger() if c.Signaler.PeerWithdrawn() { span.AddEvent("peer withdrawn") diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 2a3b1ae6..72071f6d 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -95,6 +95,11 @@ func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts, // It is safe to call methods on a nil ticket, they will error with ErrNoTicket // This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...) func (p *Pod) Send(msg Message) *MsgReceipt { + ctx, span := tracing.Tracer.Start(msg.Context(), "pod.Send") + defer span.End() + + msg.SetContext(ctx) + ll := p.logger.With().Str("requestID", msg.ParentID()).Logger() ll.Info().Msg("sending message in pod.send") @@ -111,6 +116,7 @@ func (p *Pod) Send(msg Message) *MsgReceipt { ll.Info().Msg("sending message to the bus chan") + span.AddEvent("sending message to the bus channel") p.busChan <- msg ll.Info().Msg("sent message to bus chan") diff --git a/foundation/bus/bus/podconnection.go b/foundation/bus/bus/podconnection.go index 4b3f5c5a..1892aa80 100644 --- a/foundation/bus/bus/podconnection.go +++ b/foundation/bus/bus/podconnection.go @@ -57,10 +57,12 @@ func (p *podConnection) send(msg Message) { msg.SetContext(ctx) - go func(gctx context.Context) { - _, gspan := tracing.Tracer.Start(gctx, "go func inside podconnection.send") + go func(gctx context.Context, gmsg Message) { + gctx, gspan := tracing.Tracer.Start(gctx, "go func inside podconnection.send") defer gspan.End() + gmsg.SetContext(gctx) + p.lock.RLock() defer p.lock.RUnlock() @@ -69,8 +71,8 @@ func (p *podConnection) send(msg Message) { return } - p.messageChan <- msg - }(ctx) + p.messageChan <- gmsg + }(ctx, msg) } // checkStatus checks the pod's feedback for any information or failed messages and drains the failures into the failed Message buffer diff --git a/foundation/bus/transport/websocket/transport.go b/foundation/bus/transport/websocket/transport.go index 8042140e..6ee14579 100644 --- a/foundation/bus/transport/websocket/transport.go +++ b/foundation/bus/transport/websocket/transport.go @@ -148,7 +148,7 @@ func (c *Conn) SendMsg(msg bus.Message) error { return errors.Wrap(err, "[transport-websocket] failed to Marshal message") } - ll.Info().Msg("sending message to connection over binary") + ll.Info().Str("messagebytes", string(msgBytes)).Msg("sending message to connection over binary") if err := c.WriteMessage(websocket.BinaryMessage, msgBytes); err != nil { if errors.Is(err, websocket.ErrCloseSent) { diff --git a/foundation/scheduler/core.go b/foundation/scheduler/core.go index 0e27531f..ca8d17a3 100644 --- a/foundation/scheduler/core.go +++ b/foundation/scheduler/core.go @@ -1,16 +1,21 @@ package scheduler import ( + "context" "fmt" "sync" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/suborbital/e2core/foundation/tracing" ) // coreDoFunc is an internal version of DoFunc that takes a // Job pointer instead of a Job value for the best memory usage -type coreDoFunc func(job *Job) *Result +type coreDoFunc func(ctx context.Context, job *Job) *Result // core is the 'core scheduler' for reactr, handling execution of // Tasks, Jobs, and Schedules @@ -36,8 +41,13 @@ func newCore(log zerolog.Logger) *core { return c } -func (c *core) do(job *Job) *Result { +func (c *core) do(ctx context.Context, job *Job) *Result { + ctx, span := tracing.Tracer.Start(ctx, "core.do") + defer span.End() + result := newResult(job.UUID()) + span.AddEvent("created a new job", trace.WithAttributes( + attribute.String("job-uuid", job.UUID()))) rid := "no-request" if job.Req() != nil { @@ -48,18 +58,24 @@ func (c *core) do(job *Job) *Result { ll.Info().Msg("core.do function got called") + span.AddEvent("core.scaler.findWorder for job type", trace.WithAttributes( + attribute.String("jobType", job.jobType), + )) jobWorker := c.scaler.findWorker(job.jobType) if jobWorker == nil { result.sendErr(fmt.Errorf("failed to getWorker for jobType %q", job.jobType)) return result } - go func() { + go func(goctx context.Context) { + ctx, span := tracing.Tracer.Start(goctx, "go func inside core.do") + defer span.End() + job.result = result ll.Info().Msg("jobworker got a job scheduled") - jobWorker.schedule(job) - }() + jobWorker.schedule(ctx, job) + }(ctx) ll.Info().Msg("returning result from core.do func") return result diff --git a/foundation/scheduler/ctx.go b/foundation/scheduler/ctx.go index 29f50fd7..b4f69382 100644 --- a/foundation/scheduler/ctx.go +++ b/foundation/scheduler/ctx.go @@ -39,7 +39,7 @@ func newCtx(doFunc coreDoFunc) *Ctx { // Do runs a new job func (c *Ctx) Do(job Job) *Result { - return c.doFunc(&job) + return c.doFunc(c.Context, &job) } func (c *Ctx) SetFFIResult(result []byte, err error) (*FFIResult, error) { diff --git a/foundation/scheduler/scheduler.go b/foundation/scheduler/scheduler.go index a984e6fb..97104545 100644 --- a/foundation/scheduler/scheduler.go +++ b/foundation/scheduler/scheduler.go @@ -1,13 +1,17 @@ package scheduler import ( + "context" "encoding/json" "os" "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/suborbital/e2core/foundation/bus/bus" + "github.com/suborbital/e2core/foundation/tracing" ) // MsgTypeReactrJobErr and others are Grav message types used for Scheduler job @@ -19,7 +23,7 @@ const ( ) // JobFunc is a function that runs a job of a predetermined type -type JobFunc func(interface{}) *Result +type JobFunc func(context.Context, interface{}) *Result // Scheduler represents the main control object type Scheduler struct { @@ -49,7 +53,10 @@ func NewWithLogger(log zerolog.Logger) *Scheduler { } // Do schedules a job to be worked on and returns a result object -func (r *Scheduler) Do(job Job) *Result { +func (r *Scheduler) Do(ctx context.Context, job Job) *Result { + ctx, span := tracing.Tracer.Start(ctx, "scheduler do with job") + defer span.End() + if job.Req() == nil { r.log.Info(). Str("requestID", "no-request"). @@ -60,7 +67,7 @@ func (r *Scheduler) Do(job Job) *Result { Msg("scheduler.Do function got called, passing it on to core.do") } - return r.core.do(&job) + return r.core.do(ctx, &job) } // Schedule adds a new Schedule to the instance, Scheduler will 'watch' the Schedule @@ -73,8 +80,8 @@ func (r *Scheduler) Schedule(s Schedule) { func (r *Scheduler) Register(jobType string, runner Runnable, options ...Option) JobFunc { r.core.register(jobType, runner, options...) - helper := func(data interface{}) *Result { - return r.Do(NewJob(jobType, data)) + helper := func(ctx context.Context, data interface{}) *Result { + return r.Do(ctx, NewJob(jobType, data)) } return helper @@ -135,22 +142,34 @@ func (r *Scheduler) Listen(pod *bus.Pod, msgType string) { // ListenAndRun subscribes Scheduler to a messageType and calls `run` for each job result func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Message, interface{}, error)) { - helper := func(data interface{}) *Result { + helper := func(ctx context.Context, data interface{}) *Result { + ctx, span := tracing.Tracer.Start(ctx, "helper function") + defer span.End() + + span.AddEvent("new job from data and msg type", trace.WithAttributes( + attribute.String("msgType", msgType), + )) job := NewJob(msgType, data) - return r.Do(job) + return r.Do(ctx, job) } // each time a message is received with the associated type, // execute the associated job and pass the result to `run` pod.OnType(msgType, func(msg bus.Message) error { + ctx, span := tracing.Tracer.Start(msg.Context(), "scheduler.ListenAndRun", trace.WithAttributes( + attribute.String("msgType", msgType), + )) + defer span.End() + msg.SetContext(ctx) r.log.Info(). Str("msgType", msgType). Str("requestID", msg.ParentID()). Msg("scheduler.ListenAndRun called, msg turned into a job, and job passed to scheduler.Do function") - result, err := helper(msg.Data()).Then() + span.AddEvent("sending msg.data to the helper") + result, err := helper(ctx, msg.Data()).Then() run(msg, result, err) diff --git a/foundation/scheduler/watcher.go b/foundation/scheduler/watcher.go index cf4a3d2a..4ef5f14c 100644 --- a/foundation/scheduler/watcher.go +++ b/foundation/scheduler/watcher.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "sync" "time" @@ -11,13 +12,13 @@ import ( // them for new jobs to send to the scheduler type watcher struct { schedules map[string]Schedule - scheduleFunc func(*Job) *Result + scheduleFunc coreDoFunc lock sync.RWMutex startOnce sync.Once } -func newWatcher(scheduleFunc func(*Job) *Result) *watcher { +func newWatcher(scheduleFunc coreDoFunc) *watcher { w := &watcher{ schedules: make(map[string]Schedule), scheduleFunc: scheduleFunc, @@ -53,7 +54,7 @@ func (w *watcher) watch(sched Schedule) { } else { if job := s.Check(); job != nil { // schedule the job and discard the result - w.scheduleFunc(job).Discard() + w.scheduleFunc(context.Background(), job).Discard() } } } diff --git a/foundation/scheduler/worker.go b/foundation/scheduler/worker.go index b58db676..18f179b8 100644 --- a/foundation/scheduler/worker.go +++ b/foundation/scheduler/worker.go @@ -1,11 +1,14 @@ package scheduler import ( + "context" "sync" "time" "github.com/pkg/errors" "golang.org/x/sync/singleflight" + + "github.com/suborbital/e2core/foundation/tracing" ) const ( @@ -48,16 +51,23 @@ func newWorker(runner Runnable, doFunc coreDoFunc, opts workerOpts) *worker { return w } -func (w *worker) schedule(job *Job) { - go func() { +func (w *worker) schedule(ctx context.Context, job *Job) { + ctx, span := tracing.Tracer.Start(ctx, "worker.schedule") + defer span.End() + + go func(goctx context.Context) { + _, span := tracing.Tracer.Start(goctx, "go func inside worker.schedule") + defer span.End() + if err := w.reconcilePoolSize(); err != nil { job.result.sendErr(errors.Wrap(err, "failed to reconcilePoolSize")) return } + span.AddEvent("adding job to the worker workchannel and incrementing the rate by one") w.workChan <- job w.rate.add() - }() + }(ctx) } // start ensures the worker is ready to receive jobs diff --git a/foundation/tracing/tracing.go b/foundation/tracing/tracing.go index 356dd62e..84ff2b6e 100644 --- a/foundation/tracing/tracing.go +++ b/foundation/tracing/tracing.go @@ -6,6 +6,8 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" sdkTrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" diff --git a/sat/sat/embedded.go b/sat/sat/embedded.go index 982e089f..89b628c3 100644 --- a/sat/sat/embedded.go +++ b/sat/sat/embedded.go @@ -1,17 +1,22 @@ package sat import ( + "context" "net/http" "github.com/google/uuid" "github.com/pkg/errors" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/request" ) // Exec takes input bytes, executes the loaded module, and returns the result -func (s *Sat) Exec(input []byte) (*request.CoordinatedResponse, error) { +func (s *Sat) Exec(ctx context.Context, input []byte) (*request.CoordinatedResponse, error) { + ctx, span := tracing.Tracer.Start(ctx, "sat.Exec") + defer span.End() + // construct a fake HTTP request from the input req := &request.CoordinatedRequest{ Method: http.MethodPost, @@ -24,7 +29,7 @@ func (s *Sat) Exec(input []byte) (*request.CoordinatedResponse, error) { State: map[string][]byte{}, } - result, err := s.engine.Do(scheduler.NewJob(s.config.JobType, req)).Then() + result, err := s.engine.Do(ctx, scheduler.NewJob(s.config.JobType, req)).Then() if err != nil { return nil, errors.Wrap(err, "failed to exec") } diff --git a/sat/sat/handler.go b/sat/sat/handler.go index 0b8e63a3..36095ebf 100644 --- a/sat/sat/handler.go +++ b/sat/sat/handler.go @@ -40,7 +40,7 @@ func (s *Sat) handler(engine *engine2.Engine) echo.HandlerFunc { return echo.NewHTTPError(http.StatusInternalServerError, "unknown error").SetInternal(fmt.Errorf("module %s is not registered", s.config.JobType)) } - result, err := engine.Do(scheduler.NewJob(s.config.JobType, req)).Then() + result, err := engine.Do(spanCtx, scheduler.NewJob(s.config.JobType, req)).Then() if err != nil { if errors.As(err, &runErr) { // runErr would be an actual error returned from a function From 7d8819ca281ac96ec634e5f5cb3935ad76638fab Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Tue, 16 May 2023 12:21:33 +0100 Subject: [PATCH 20/21] Add context to the Job type --- e2core/syncer/syncer.go | 2 +- foundation/scheduler/core.go | 22 +++++++++++++--------- foundation/scheduler/ctx.go | 2 +- foundation/scheduler/job.go | 11 +++++++++++ foundation/scheduler/scheduler.go | 18 ++++++++++-------- foundation/scheduler/watcher.go | 13 ++++++------- foundation/scheduler/worker.go | 8 +++++--- sat/sat/embedded.go | 2 +- sat/sat/handler.go | 2 +- 9 files changed, 49 insertions(+), 31 deletions(-) diff --git a/e2core/syncer/syncer.go b/e2core/syncer/syncer.go index 73d695dd..35e4168f 100644 --- a/e2core/syncer/syncer.go +++ b/e2core/syncer/syncer.go @@ -62,7 +62,7 @@ func (s *Syncer) Start() error { } // sync once to seed the initial state - if _, err := s.sched.Do(context.Background(), scheduler.NewJob("sync", nil)).Then(); err != nil { + if _, err := s.sched.Do(scheduler.NewJob("sync", nil).WithContext(context.Background())).Then(); err != nil { return errors.Wrap(err, "failed to Do sync job") } diff --git a/foundation/scheduler/core.go b/foundation/scheduler/core.go index ca8d17a3..a5c851a6 100644 --- a/foundation/scheduler/core.go +++ b/foundation/scheduler/core.go @@ -1,7 +1,6 @@ package scheduler import ( - "context" "fmt" "sync" @@ -15,7 +14,7 @@ import ( // coreDoFunc is an internal version of DoFunc that takes a // Job pointer instead of a Job value for the best memory usage -type coreDoFunc func(ctx context.Context, job *Job) *Result +type coreDoFunc func(job *Job) *Result // core is the 'core scheduler' for reactr, handling execution of // Tasks, Jobs, and Schedules @@ -41,10 +40,12 @@ func newCore(log zerolog.Logger) *core { return c } -func (c *core) do(ctx context.Context, job *Job) *Result { - ctx, span := tracing.Tracer.Start(ctx, "core.do") +func (c *core) do(incomingJob *Job) *Result { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "core.do") defer span.End() + job := incomingJob.WithContext(ctx) + result := newResult(job.UUID()) span.AddEvent("created a new job", trace.WithAttributes( attribute.String("job-uuid", job.UUID()))) @@ -67,15 +68,18 @@ func (c *core) do(ctx context.Context, job *Job) *Result { return result } - go func(goctx context.Context) { - ctx, span := tracing.Tracer.Start(goctx, "go func inside core.do") + go func(gjob Job) { + ctx, span := tracing.Tracer.Start(gjob.Context(), "go func inside core.do") defer span.End() - job.result = result + ggjob := gjob.WithContext(ctx) + + ggjob.result = result + ll.Info().Msg("jobworker got a job scheduled") - jobWorker.schedule(ctx, job) - }(ctx) + jobWorker.schedule(&ggjob) + }(job) ll.Info().Msg("returning result from core.do func") return result diff --git a/foundation/scheduler/ctx.go b/foundation/scheduler/ctx.go index b4f69382..29f50fd7 100644 --- a/foundation/scheduler/ctx.go +++ b/foundation/scheduler/ctx.go @@ -39,7 +39,7 @@ func newCtx(doFunc coreDoFunc) *Ctx { // Do runs a new job func (c *Ctx) Do(job Job) *Result { - return c.doFunc(c.Context, &job) + return c.doFunc(&job) } func (c *Ctx) SetFFIResult(result []byte, err error) (*FFIResult, error) { diff --git a/foundation/scheduler/job.go b/foundation/scheduler/job.go index d98b6760..9225c354 100644 --- a/foundation/scheduler/job.go +++ b/foundation/scheduler/job.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "encoding/json" "errors" @@ -15,6 +16,7 @@ type Job struct { jobType string result *Result data interface{} + ctx context.Context req *request.CoordinatedRequest } @@ -88,3 +90,12 @@ func (j Job) Data() interface{} { func (j Job) Req() *request.CoordinatedRequest { return j.req } + +func (j Job) WithContext(ctx context.Context) Job { + j.ctx = ctx + return j +} + +func (j Job) Context() context.Context { + return j.ctx +} diff --git a/foundation/scheduler/scheduler.go b/foundation/scheduler/scheduler.go index 97104545..41aa4b18 100644 --- a/foundation/scheduler/scheduler.go +++ b/foundation/scheduler/scheduler.go @@ -23,7 +23,7 @@ const ( ) // JobFunc is a function that runs a job of a predetermined type -type JobFunc func(context.Context, interface{}) *Result +type JobFunc func(interface{}) *Result // Scheduler represents the main control object type Scheduler struct { @@ -53,10 +53,12 @@ func NewWithLogger(log zerolog.Logger) *Scheduler { } // Do schedules a job to be worked on and returns a result object -func (r *Scheduler) Do(ctx context.Context, job Job) *Result { - ctx, span := tracing.Tracer.Start(ctx, "scheduler do with job") +func (r *Scheduler) Do(incomingJob Job) *Result { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "scheduler do with job") defer span.End() + job := incomingJob.WithContext(ctx) + if job.Req() == nil { r.log.Info(). Str("requestID", "no-request"). @@ -67,7 +69,7 @@ func (r *Scheduler) Do(ctx context.Context, job Job) *Result { Msg("scheduler.Do function got called, passing it on to core.do") } - return r.core.do(ctx, &job) + return r.core.do(&job) } // Schedule adds a new Schedule to the instance, Scheduler will 'watch' the Schedule @@ -80,8 +82,8 @@ func (r *Scheduler) Schedule(s Schedule) { func (r *Scheduler) Register(jobType string, runner Runnable, options ...Option) JobFunc { r.core.register(jobType, runner, options...) - helper := func(ctx context.Context, data interface{}) *Result { - return r.Do(ctx, NewJob(jobType, data)) + helper := func(data interface{}) *Result { + return r.Do(NewJob(jobType, data)) } return helper @@ -149,9 +151,9 @@ func (r *Scheduler) ListenAndRun(pod *bus.Pod, msgType string, run func(bus.Mess span.AddEvent("new job from data and msg type", trace.WithAttributes( attribute.String("msgType", msgType), )) - job := NewJob(msgType, data) + job := NewJob(msgType, data).WithContext(ctx) - return r.Do(ctx, job) + return r.Do(job) } // each time a message is received with the associated type, diff --git a/foundation/scheduler/watcher.go b/foundation/scheduler/watcher.go index 4ef5f14c..1e573ef1 100644 --- a/foundation/scheduler/watcher.go +++ b/foundation/scheduler/watcher.go @@ -1,7 +1,6 @@ package scheduler import ( - "context" "sync" "time" @@ -44,25 +43,25 @@ func (w *watcher) watch(sched Schedule) { // loop forever and check each schedule for new jobs // repeating every second for { - remove := []string{} + remove := make([]string, 0) w.lock.RLock() - for uuid, s := range w.schedules { + for scheduledUUID, s := range w.schedules { if s.Done() { // set the schedule to be removed if it's done - remove = append(remove, uuid) + remove = append(remove, scheduledUUID) } else { if job := s.Check(); job != nil { // schedule the job and discard the result - w.scheduleFunc(context.Background(), job).Discard() + w.scheduleFunc(job).Discard() } } } w.lock.RUnlock() w.lock.Lock() - for _, uuid := range remove { - delete(w.schedules, uuid) + for _, uuidToRemove := range remove { + delete(w.schedules, uuidToRemove) } w.lock.Unlock() diff --git a/foundation/scheduler/worker.go b/foundation/scheduler/worker.go index 18f179b8..cf1a39b2 100644 --- a/foundation/scheduler/worker.go +++ b/foundation/scheduler/worker.go @@ -51,10 +51,12 @@ func newWorker(runner Runnable, doFunc coreDoFunc, opts workerOpts) *worker { return w } -func (w *worker) schedule(ctx context.Context, job *Job) { - ctx, span := tracing.Tracer.Start(ctx, "worker.schedule") +func (w *worker) schedule(incomingJob *Job) { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "worker.schedule") defer span.End() + job := incomingJob.WithContext(ctx) + go func(goctx context.Context) { _, span := tracing.Tracer.Start(goctx, "go func inside worker.schedule") defer span.End() @@ -65,7 +67,7 @@ func (w *worker) schedule(ctx context.Context, job *Job) { } span.AddEvent("adding job to the worker workchannel and incrementing the rate by one") - w.workChan <- job + w.workChan <- &job w.rate.add() }(ctx) } diff --git a/sat/sat/embedded.go b/sat/sat/embedded.go index 89b628c3..7536ef72 100644 --- a/sat/sat/embedded.go +++ b/sat/sat/embedded.go @@ -29,7 +29,7 @@ func (s *Sat) Exec(ctx context.Context, input []byte) (*request.CoordinatedRespo State: map[string][]byte{}, } - result, err := s.engine.Do(ctx, scheduler.NewJob(s.config.JobType, req)).Then() + result, err := s.engine.Do(scheduler.NewJob(s.config.JobType, req).WithContext(ctx)).Then() if err != nil { return nil, errors.Wrap(err, "failed to exec") } diff --git a/sat/sat/handler.go b/sat/sat/handler.go index 36095ebf..d8bbfcb4 100644 --- a/sat/sat/handler.go +++ b/sat/sat/handler.go @@ -40,7 +40,7 @@ func (s *Sat) handler(engine *engine2.Engine) echo.HandlerFunc { return echo.NewHTTPError(http.StatusInternalServerError, "unknown error").SetInternal(fmt.Errorf("module %s is not registered", s.config.JobType)) } - result, err := engine.Do(spanCtx, scheduler.NewJob(s.config.JobType, req)).Then() + result, err := engine.Do(scheduler.NewJob(s.config.JobType, req).WithContext(spanCtx)).Then() if err != nil { if errors.As(err, &runErr) { // runErr would be an actual error returned from a function From 7b7b50632650cd24231d142611649d94e50c920c Mon Sep 17 00:00:00 2001 From: Gabor Javorszky Date: Tue, 16 May 2023 14:16:36 +0100 Subject: [PATCH 21/21] Trace all the way down to execution --- e2core/auth/access.go | 6 ++++++ foundation/scheduler/worker.go | 10 ++++++---- foundation/scheduler/workthread.go | 16 ++++++++++++---- sat/engine2/runtime/pool.go | 11 +++++++++-- sat/engine2/wasmrunnable.go | 18 ++++++++++++++---- 5 files changed, 47 insertions(+), 14 deletions(-) diff --git a/e2core/auth/access.go b/e2core/auth/access.go index 1e81148f..891e5673 100644 --- a/e2core/auth/access.go +++ b/e2core/auth/access.go @@ -13,6 +13,7 @@ import ( "github.com/suborbital/e2core/e2core/options" "github.com/suborbital/e2core/foundation/common" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/systemspec/system" ) @@ -28,6 +29,11 @@ func AuthorizationMiddleware(opts *options.Options) echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { + ctx, span := tracing.Tracer.Start(c.Request().Context(), "authorization-middleware") + defer span.End() + + c.SetRequest(c.Request().WithContext(ctx)) + identifier := c.Param("ident") namespace := c.Param("namespace") name := c.Param("name") diff --git a/foundation/scheduler/worker.go b/foundation/scheduler/worker.go index cf1a39b2..e46b3913 100644 --- a/foundation/scheduler/worker.go +++ b/foundation/scheduler/worker.go @@ -1,7 +1,6 @@ package scheduler import ( - "context" "sync" "time" @@ -57,10 +56,13 @@ func (w *worker) schedule(incomingJob *Job) { job := incomingJob.WithContext(ctx) - go func(goctx context.Context) { - _, span := tracing.Tracer.Start(goctx, "go func inside worker.schedule") + go func(incomingJob Job) { + ctx, span := tracing.Tracer.Start(incomingJob.Context(), "go func inside worker.schedule") defer span.End() + job := incomingJob.WithContext(ctx) + + span.AddEvent("reconciling pool size in worker") if err := w.reconcilePoolSize(); err != nil { job.result.sendErr(errors.Wrap(err, "failed to reconcilePoolSize")) return @@ -69,7 +71,7 @@ func (w *worker) schedule(incomingJob *Job) { span.AddEvent("adding job to the worker workchannel and incrementing the rate by one") w.workChan <- &job w.rate.add() - }(ctx) + }(job) } // start ensures the worker is ready to receive jobs diff --git a/foundation/scheduler/workthread.go b/foundation/scheduler/workthread.go index d25b47fa..e0693498 100644 --- a/foundation/scheduler/workthread.go +++ b/foundation/scheduler/workthread.go @@ -3,6 +3,8 @@ package scheduler import ( "context" "time" + + "github.com/suborbital/e2core/foundation/tracing" ) type workThread struct { @@ -38,18 +40,22 @@ func (wt *workThread) run() { } // wait for the next job - job := <-wt.workChan + inJob := <-wt.workChan + ctx, span := tracing.Tracer.Start(inJob.Context(), "workthread.run in scheduler") + + job := inJob.WithContext(ctx) + var err error - ctx := newCtx(wt.doFunc) + workCtx := newCtx(wt.doFunc) var result interface{} if wt.timeoutSeconds == 0 { // we pass in a dereferenced job so that the Runner cannot modify it - result, err = wt.runner.Run(*job, ctx) + result, err = wt.runner.Run(job, workCtx) } else { - result, err = wt.runWithTimeout(job, ctx) + result, err = wt.runWithTimeout(&job, workCtx) } if err != nil { @@ -58,6 +64,8 @@ func (wt *workThread) run() { } job.result.sendResult(result) + + span.End() } }() } diff --git a/sat/engine2/runtime/pool.go b/sat/engine2/runtime/pool.go index 0f737901..d729d338 100644 --- a/sat/engine2/runtime/pool.go +++ b/sat/engine2/runtime/pool.go @@ -1,12 +1,14 @@ package runtime import ( + "context" "sync" "github.com/bytecodealliance/wasmtime-go/v7" "github.com/pkg/errors" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2/api" "github.com/suborbital/e2core/sat/engine2/runtime/instance" "github.com/suborbital/systemspec/tenant" @@ -65,7 +67,10 @@ func (ip *InstancePool) RemoveInstance() error { } // UseInstance provides an instance from the environment's pool to be used by a callback function -func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance.Instance, int32)) error { +func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, spanCtx context.Context, instFunc func(context.Context, *instance.Instance, int32)) error { + spanCtx, span := tracing.Tracer.Start(spanCtx, "instancePool.UseInstance") + defer span.End() + go func() { // prepare a new instance if err := ip.AddInstance(); err != nil { @@ -81,6 +86,7 @@ func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance. it = nil }(inst) + span.AddEvent("instance.Store") // generate a random identifier as a reference to the instance in use to // easily allow the Wasm module to reference itself when calling back over the FFI ident, err := instance.Store(inst) @@ -88,11 +94,12 @@ func (ip *InstancePool) UseInstance(ctx *scheduler.Ctx, instFunc func(*instance. return errors.Wrap(err, "failed to setupNewIdentifier") } + span.AddEvent("instance.UseCtx") // setup the instance's temporary state inst.UseCtx(ctx) // do the actual call into the Wasm module - instFunc(inst, ident) + instFunc(spanCtx, inst, ident) // clear the instance's temporary state inst.UseCtx(nil) diff --git a/sat/engine2/wasmrunnable.go b/sat/engine2/wasmrunnable.go index 9f36735a..cd2e1355 100644 --- a/sat/engine2/wasmrunnable.go +++ b/sat/engine2/wasmrunnable.go @@ -1,10 +1,13 @@ package engine2 import ( + "context" + "github.com/pkg/errors" "github.com/suborbital/e2core/e2core/sequence" "github.com/suborbital/e2core/foundation/scheduler" + "github.com/suborbital/e2core/foundation/tracing" "github.com/suborbital/e2core/sat/engine2/api" "github.com/suborbital/e2core/sat/engine2/runtime" "github.com/suborbital/e2core/sat/engine2/runtime/instance" @@ -33,17 +36,20 @@ func newRunnerFromRef(ref *tenant.WasmModuleRef, api api.HostAPI) *wasmRunner { } // Run runs a wasmRunner -func (w *wasmRunner) Run(job scheduler.Job, ctx *scheduler.Ctx) (interface{}, error) { +func (w *wasmRunner) Run(incomingJob scheduler.Job, ctx *scheduler.Ctx) (interface{}, error) { + spanCtx, span := tracing.Tracer.Start(incomingJob.Context(), "wasmRunner.Run") + defer span.End() + + job := incomingJob.WithContext(spanCtx) + var jobBytes []byte var req *request.CoordinatedRequest // check to ensure the job is a CoordinatedRequest (pointer or bytes), and set up the WasmInstance if jobReq, ok := job.Data().(*request.CoordinatedRequest); ok { req = jobReq - } else if jobReq, err := request.FromJSON(job.Bytes()); err == nil { req = jobReq - } else { return nil, errors.New("job data is not a CoordinatedRequest") } @@ -71,13 +77,17 @@ func (w *wasmRunner) Run(job scheduler.Job, ctx *scheduler.Ctx) (interface{}, er var runErr error var callErr error - if err := w.pool.UseInstance(ctx, func(instance *instance.Instance, ident int32) { + if err := w.pool.UseInstance(ctx, spanCtx, func(ctx context.Context, instance *instance.Instance, ident int32) { + _, span := tracing.Tracer.Start(ctx, "instance function") + defer span.End() + inPointer, writeErr := instance.WriteMemory(jobBytes) if writeErr != nil { runErr = errors.Wrap(writeErr, "failed to instance.writeMemory") return } + span.AddEvent("instance.Call run_e") // execute the module's Run function, passing the input data and ident // set runErr but don't return because the ExecutionResult error should also be grabbed _, callErr = instance.Call("run_e", inPointer, int32(len(jobBytes)), ident)