From 54df40036cb30ab3b8d0a8b43cab8bad29f65c75 Mon Sep 17 00:00:00 2001 From: Wes Date: Tue, 3 Jun 2025 10:51:19 -0700 Subject: [PATCH 1/4] feat: new buildengine experiments --- cmd/ftl/cmd_build.go | 18 +- internal/buildengine/engine_v2.go | 474 ++++++++++++++++++++++++++++++ 2 files changed, 479 insertions(+), 13 deletions(-) create mode 100644 internal/buildengine/engine_v2.go diff --git a/cmd/ftl/cmd_build.go b/cmd/ftl/cmd_build.go index dd1ba61bdf..7867a9c4ca 100644 --- a/cmd/ftl/cmd_build.go +++ b/cmd/ftl/cmd_build.go @@ -5,8 +5,6 @@ import ( errors "github.com/alecthomas/errors" - "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/log" "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" @@ -20,11 +18,9 @@ type buildCmd struct { func (b *buildCmd) Run( ctx context.Context, - adminClient adminpbconnect.AdminServiceClient, schemaSource *schemaeventsource.EventSource, projConfig projectconfig.Config, ) error { - logger := log.FromContext(ctx) if len(b.Dirs) == 0 { b.Dirs = projConfig.AbsModuleDirs() } @@ -35,24 +31,20 @@ func (b *buildCmd) Run( // Cancel build engine context to ensure all language plugins are killed. ctx, cancel := context.WithCancelCause(ctx) defer cancel(errors.Wrap(context.Canceled, "build stopped")) - engine, err := buildengine.New( + engine, err := buildengine.NewV2( ctx, - adminClient, schemaSource, projConfig, b.Dirs, false, - buildengine.BuildEnv(b.BuildEnv), - buildengine.Parallelism(b.Parallelism), + buildengine.BuildEnvV2(b.BuildEnv), + buildengine.ParallelismV2(b.Parallelism), ) if err != nil { return errors.WithStack(err) } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to build") - return nil - } - if err := engine.Build(ctx); err != nil { + + if err := engine.BuildV2(ctx, true); err != nil { return errors.Wrap(err, "build failed") } return nil diff --git a/internal/buildengine/engine_v2.go b/internal/buildengine/engine_v2.go new file mode 100644 index 0000000000..81d9392ee1 --- /dev/null +++ b/internal/buildengine/engine_v2.go @@ -0,0 +1,474 @@ +package buildengine + +import ( + "context" + "runtime" + "time" + + "golang.org/x/exp/maps" + + errors "github.com/alecthomas/errors" + "github.com/alecthomas/types/pubsub" + buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" + "github.com/block/ftl/common/log" + "github.com/block/ftl/common/schema" + "github.com/block/ftl/internal/buildengine/languageplugin" + "github.com/block/ftl/internal/dev" + "github.com/block/ftl/internal/moduleconfig" + "github.com/block/ftl/internal/projectconfig" + "github.com/block/ftl/internal/watch" + "github.com/puzpuzpuz/xsync/v3" + "golang.org/x/sync/errgroup" +) + +// isBuildComplete returns true if the module is considered complete based on the deployAfterBuild flag +func isBuildComplete(event *buildenginepb.EngineEvent) bool { + switch event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildSuccess, + *buildenginepb.EngineEvent_ModuleDeployWaiting, + *buildenginepb.EngineEvent_ModuleDeployStarted, + *buildenginepb.EngineEvent_ModuleDeploySuccess, + *buildenginepb.EngineEvent_ModuleDeployFailed: + return true + default: + return false + } +} + +// isModuleComplete returns true if the module is considered complete based on the deployAfterBuild flag +func isModuleComplete(event *buildenginepb.EngineEvent, deployAfterBuild bool) bool { + switch event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildSuccess: + return !deployAfterBuild + case *buildenginepb.EngineEvent_ModuleDeploySuccess: + return deployAfterBuild + case *buildenginepb.EngineEvent_ModuleBuildFailed, + *buildenginepb.EngineEvent_ModuleDeployFailed: + return true + default: + return false + } +} + +// moduleMeta is a wrapper around a module that includes the last build's start time. +type moduleMetaV2 struct { + module Module + plugin *languageplugin.LanguagePlugin + configDefaults moduleconfig.CustomDefaults + state *buildenginepb.EngineEvent + deployAfterBuild bool +} + +type EngineV2 struct { + projectConfig projectconfig.Config + parallelism int + moduleDirs []string + moduleMetas *xsync.MapOf[string, *moduleMetaV2] + builtModules *xsync.MapOf[string, *schema.Module] + stateChanges *pubsub.Topic[StateChange] + buildEnv []string + os string + arch string + devMode bool + + // TODO: Can we remove this? + devModeEndpointUpdates chan dev.LocalEndpoint +} + +type EngineV2Option func(o *EngineV2) + +func BuildEnvV2(env []string) EngineV2Option { + return func(o *EngineV2) { + o.buildEnv = env + } +} + +func ParallelismV2(n int) EngineV2Option { + return func(o *EngineV2) { + o.parallelism = n + } +} + +// StateChange represents a state change event for a specific module. +type StateChange struct { + Module string + Event *buildenginepb.EngineEvent +} + +// initializeModule handles the initialization of a single module, including: +// - Getting language-specific defaults +// - Validating and filling config defaults +// - Creating the language plugin +// - Creating the module instance +func NewV2( + ctx context.Context, + _ interface{}, // schemaSource placeholder + projectConfig projectconfig.Config, + moduleDirs []string, + _ bool, // logChanges placeholder + options ...EngineV2Option, +) (*EngineV2, error) { + logger := log.FromContext(ctx).Scope("engine") + ctx = log.ContextWithLogger(ctx, logger) + + e := &EngineV2{ + projectConfig: projectConfig, + parallelism: runtime.NumCPU(), + moduleDirs: moduleDirs, + moduleMetas: xsync.NewMapOf[string, *moduleMetaV2](), + builtModules: xsync.NewMapOf[string, *schema.Module](), + stateChanges: pubsub.New[StateChange](), + } + for _, option := range options { + option(e) + } + + // Discover modules + configs, err := watch.DiscoverModules(ctx, moduleDirs) + if err != nil { + return nil, errors.Wrap(err, "could not discover modules") + } + + err = CleanStubs(ctx, projectConfig.Root(), configs) + if err != nil { + return nil, errors.Wrap(err, "failed to clean stubs") + } + + logger.Infof("Initializing modules") + + wg := &errgroup.Group{} + for _, config := range configs { + wg.Go(func() error { + return e.initModuleMeta(ctx, config) + }) + } + + if err := wg.Wait(); err != nil { + return nil, errors.WithStack(err) + } + + go e.processChanges(ctx) + + return e, nil +} + +// checkModuleState verifies that a module exists and its current state matches the expected state. +func (e *EngineV2) checkModuleState(ctx context.Context, module string, expectedEvent *buildenginepb.EngineEvent) (*moduleMetaV2, error) { + logger := log.FromContext(ctx).Scope("engine") + meta, ok := e.moduleMetas.Load(module) + if !ok { + logger.Warnf("Module %s not found", module) + return nil, errors.Errorf("module %s not found", module) + } + if meta.state.Event != expectedEvent.Event { + logger.Tracef("Module %s state mismatch: got %T(%+v), expected %T(%+v)", module, meta.state.Event, meta.state.Event, expectedEvent.Event, expectedEvent.Event) + return nil, errors.Errorf("module %s state mismatch: got %T(%+v), expected %T(%+v)", module, meta.state.Event, meta.state.Event, expectedEvent.Event, expectedEvent.Event) + } + return meta, nil +} + +func (e *EngineV2) processChanges(ctx context.Context) { + ch := make(chan StateChange, 128) + e.stateChanges.Subscribe(ch) + logger := log.FromContext(ctx).Scope("engine") + logger.Infof("Processing changes with parallelism %d", e.parallelism) + + wg := errgroup.Group{} + wg.SetLimit(e.parallelism) + + for { + select { + case <-ctx.Done(): + wg.Wait() + return + case stateChange := <-ch: + wg.Go(func() error { + logger := log.FromContext(ctx).Scope(stateChange.Module).Module(stateChange.Module) + ctx = log.ContextWithLogger(ctx, logger) + + meta, err := e.checkModuleState(ctx, stateChange.Module, stateChange.Event) + if err != nil { + return err + } + + switch stateChange.Event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildWaiting: + logger.Infof("Build waiting...") + e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ + ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{}, + }, + }) + case *buildenginepb.EngineEvent_ModuleBuildStarted: + logger.Infof("Building...") + + err := e.buildModule(ctx, meta) + if err != nil { + return err + } + case *buildenginepb.EngineEvent_ModuleBuildFailed: + logger.Infof("Build failed") + case *buildenginepb.EngineEvent_ModuleBuildSuccess: + logger.Infof("Build success") + dependentModules := e.getDependentModuleNames(stateChange.Module) + for _, dependentModule := range dependentModules { + e.tryStartBuild(ctx, dependentModule) + } + + if meta.deployAfterBuild { + e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleDeployStarted{ + ModuleDeployStarted: &buildenginepb.ModuleDeployStarted{}, + }, + }) + } + + case *buildenginepb.EngineEvent_ModuleDeployWaiting: + logger.Infof("Deploy waiting...") + case *buildenginepb.EngineEvent_ModuleDeployStarted: + logger.Infof("Deploying...") + time.Sleep(1 * time.Second) + e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleDeploySuccess{ + ModuleDeploySuccess: &buildenginepb.ModuleDeploySuccess{}, + }, + }) + case *buildenginepb.EngineEvent_ModuleDeployFailed: + logger.Infof("Deploy failed") + case *buildenginepb.EngineEvent_ModuleDeploySuccess: + logger.Infof("Deploy success") + default: + logger.Infof("State change: %v", stateChange) + } + return nil + }) + } + } +} + +func (e *EngineV2) buildModule(ctx context.Context, meta *moduleMetaV2) error { + logger := log.FromContext(ctx) + logger.Infof("Building module %s", meta.module.Config.Module) + + builtModules := make([]*schema.Module, 0) + e.builtModules.Range(func(key string, module *schema.Module) bool { + builtModules = append(builtModules, module) + return true + }) + + logger.Infof("Built modules: %v", builtModules) + + sch := &schema.Schema{Realms: []*schema.Realm{{Modules: builtModules}}} //nolint:exptostd + + moduleSchema, _, _, err := build(ctx, e.projectConfig, meta.module, meta.plugin, languageplugin.BuildContext{ + Config: meta.module.Config, + Schema: sch, + Dependencies: meta.module.Dependencies(Raw), + BuildEnv: e.buildEnv, + Os: e.os, + Arch: e.arch, + }, e.devMode, e.devModeEndpointUpdates) + if err != nil { + logger.Errorf(err, "Failed to build module %s", meta.module.Config.Module) + return errors.Wrap(err, "failed to build module") + } + logger.Infof("Built module %s", meta.module.Config.Module) + + e.builtModules.Store(meta.module.Config.Module, moduleSchema) + e.updateModuleState(ctx, meta.module.Config.Module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ + ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{}, + }, + }) + + return nil +} + +// updateModuleState updates the state of a module and publishes the change. +func (e *EngineV2) updateModuleState(ctx context.Context, module string, event *buildenginepb.EngineEvent) error { + logger := log.FromContext(ctx).Scope("engine") + logger.Tracef("Updating module state: %s, %v", module, event) + meta, ok := e.moduleMetas.Load(module) + if !ok { + return errors.Errorf("module %s not found", module) + } + meta.state = event + e.moduleMetas.Store(module, meta) + e.stateChanges.Publish(StateChange{ + Module: module, + Event: event, + }) + return nil +} + +// Main entry for testing +func (e *EngineV2) BuildV2(ctx context.Context, deployAfterBuild bool) error { + logger := log.FromContext(ctx).Scope("engine") + logger.Infof("Building modules") + + // Create a channel to receive state changes + stateCh := make(chan StateChange, 64) + e.stateChanges.Subscribe(stateCh) + defer e.stateChanges.Unsubscribe(stateCh) + + // Track module build states + moduleStates := make(map[string]bool) + totalModules := 0 + completedModules := 0 + + // Initialize module states and start builds + e.moduleMetas.Range(func(key string, meta *moduleMetaV2) bool { + moduleStates[key] = false + totalModules++ + meta.deployAfterBuild = deployAfterBuild + e.moduleMetas.Store(key, meta) + e.tryStartBuild(ctx, key) + return true + }) + + // Monitor state changes until all modules are complete + for { + select { + case <-ctx.Done(): + return errors.Errorf("build cancelled: %w", ctx.Err()) + case stateChange := <-stateCh: + module := stateChange.Module + switch stateChange.Event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildSuccess, + *buildenginepb.EngineEvent_ModuleBuildFailed, + *buildenginepb.EngineEvent_ModuleDeploySuccess, + *buildenginepb.EngineEvent_ModuleDeployFailed: + if !moduleStates[module] && isModuleComplete(stateChange.Event, deployAfterBuild) { + moduleStates[module] = true + completedModules++ + switch stateChange.Event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildFailed: + logger.Infof("Module %s build failed", module) + case *buildenginepb.EngineEvent_ModuleDeployFailed: + logger.Infof("Module %s deploy failed", module) + } + } + } + + // Check if all modules are complete + if completedModules == totalModules { + // Verify if any modules failed + hasFailures := false + for module, completed := range moduleStates { + if !completed { + hasFailures = true + logger.Warnf("Module %s did not complete successfully", module) + } + } + if hasFailures { + return errors.Errorf("build failed: one or more modules failed to build") + } + // Sleep to allow logger to flush + time.Sleep(100 * time.Millisecond) + if deployAfterBuild { + logger.Infof("All modules built and deployed successfully") + } else { + logger.Infof("All modules built successfully") + } + return nil + } + } + } +} + +func (e *EngineV2) tryStartBuild(ctx context.Context, module string) error { + logger := log.FromContext(ctx).Scope("engine") + + meta, ok := e.moduleMetas.Load(module) + if !ok { + return errors.Errorf("module %s not found", module) + } + + dependencies := meta.module.Dependencies(AlwaysIncludeBuiltin) + for _, dependency := range dependencies { + if dependency == "builtin" { + continue + } + meta, ok := e.moduleMetas.Load(dependency) + if !ok { + logger.Warnf("Dependency %s not found for %s", dependency, module) + return nil + } + if !isBuildComplete(meta.state) { + logger.Warnf("Dependency %s not built for %s", dependency, module) + e.updateModuleState(ctx, module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ + ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{}, + }, + }) + return nil + } + } + + logger.Infof("Starting build for %s", module) + + e.updateModuleState(ctx, module, &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ + ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{}, + }, + }) + + return nil +} + +func (e *EngineV2) initModuleMeta(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) error { + plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module) + if err != nil { + return errors.Wrapf(err, "could not create plugin for %s", config.Module) + } + + customDefaults, err := languageplugin.GetModuleConfigDefaults(ctx, config.Language, config.Dir) + if err != nil { + return errors.Wrapf(err, "could not get defaults provider for %s", config.Module) + } + validConfig, err := config.FillDefaultsAndValidate(customDefaults, e.projectConfig) + if err != nil { + return errors.Wrapf(err, "could not apply defaults for %s", config.Module) + } + meta := moduleMetaV2{ + module: newModule(validConfig), + plugin: plugin, + configDefaults: customDefaults, + state: &buildenginepb.EngineEvent{ + Event: &buildenginepb.EngineEvent_ModuleAdded{ + ModuleAdded: &buildenginepb.ModuleAdded{ + Module: config.Module, + }, + }, + }, + deployAfterBuild: true, + } + + dependencies, err := meta.plugin.GetDependencies(ctx, meta.module.Config) + if err != nil { + return errors.Wrapf(err, "could not get dependencies for %v", meta.module.Config.Module) + } + + meta.module = meta.module.CopyWithDependencies(dependencies) + + e.moduleMetas.Store(config.Module, &meta) + e.stateChanges.Publish(StateChange{ + Module: config.Module, + Event: &buildenginepb.EngineEvent{Event: &buildenginepb.EngineEvent_ModuleAdded{ModuleAdded: &buildenginepb.ModuleAdded{Module: config.Module}}}, + }) + + return nil +} + +func (e *EngineV2) getDependentModuleNames(moduleName string) []string { + dependentModuleNames := map[string]bool{} + e.moduleMetas.Range(func(name string, meta *moduleMetaV2) bool { + for _, dep := range meta.module.Dependencies(AlwaysIncludeBuiltin) { + if dep == moduleName { + dependentModuleNames[name] = true + } + } + return true + }) + return maps.Keys(dependentModuleNames) +} From e010c9cbf187ea029537a6ac9579cb00e9d89d87 Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 4 Jun 2025 11:31:34 -0700 Subject: [PATCH 2/4] feat: make cmd_deploy work --- cmd/ftl/cmd_build.go | 4 +- cmd/ftl/cmd_deploy.go | 16 +- internal/buildengine/engine_v2.go | 330 ++++++++++++++++++++++++++---- 3 files changed, 295 insertions(+), 55 deletions(-) diff --git a/cmd/ftl/cmd_build.go b/cmd/ftl/cmd_build.go index 7867a9c4ca..566db4c45d 100644 --- a/cmd/ftl/cmd_build.go +++ b/cmd/ftl/cmd_build.go @@ -18,6 +18,7 @@ type buildCmd struct { func (b *buildCmd) Run( ctx context.Context, + adminClient buildengine.AdminClient, schemaSource *schemaeventsource.EventSource, projConfig projectconfig.Config, ) error { @@ -33,6 +34,7 @@ func (b *buildCmd) Run( defer cancel(errors.Wrap(context.Canceled, "build stopped")) engine, err := buildengine.NewV2( ctx, + adminClient, schemaSource, projConfig, b.Dirs, @@ -44,7 +46,7 @@ func (b *buildCmd) Run( return errors.WithStack(err) } - if err := engine.BuildV2(ctx, true); err != nil { + if err := engine.BuildV2(ctx, false, false); err != nil { return errors.Wrap(err, "build failed") } return nil diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index 4f9a9fa03f..f49f6360ed 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -8,7 +8,6 @@ import ( "github.com/alecthomas/types/optional" "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/log" "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" @@ -28,8 +27,6 @@ func (d *deployCmd) Run( adminClient adminpbconnect.AdminServiceClient, schemaSource *schemaeventsource.EventSource, ) error { - logger := log.FromContext(ctx) - if !schemaSource.WaitForInitialSync(ctx) { return errors.Errorf("timed out waiting for schema sync from server") } @@ -43,23 +40,18 @@ func (d *deployCmd) Run( ctx, cancel = context.WithCancelCause(ctx) defer cancel(errors.Wrap(context.Canceled, "stopping deploy")) } - engine, err := buildengine.New( + engine, err := buildengine.NewV2( ctx, adminClient, schemaSource, projConfig, d.Build.Dirs, true, - buildengine.BuildEnv(d.Build.BuildEnv), - buildengine.Parallelism(d.Build.Parallelism), + buildengine.BuildEnvV2(d.Build.BuildEnv), + buildengine.ParallelismV2(d.Build.Parallelism), ) if err != nil { return errors.WithStack(err) } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to deploy") - return nil - } - err = engine.BuildAndDeploy(ctx, d.Replicas, !d.NoWait, true) + err = engine.BuildV2(ctx, true, !d.NoWait) if err != nil { return errors.Wrap(err, "failed to deploy") } - logger.Infof("Deployed modules %v", engine.Modules()) //nolint terminal.FromContext(ctx).Close() return nil } diff --git a/internal/buildengine/engine_v2.go b/internal/buildengine/engine_v2.go index 81d9392ee1..1ad8755090 100644 --- a/internal/buildengine/engine_v2.go +++ b/internal/buildengine/engine_v2.go @@ -5,17 +5,25 @@ import ( "runtime" "time" + "connectrpc.com/connect" "golang.org/x/exp/maps" + "google.golang.org/protobuf/types/known/timestamppb" errors "github.com/alecthomas/errors" "github.com/alecthomas/types/pubsub" + "github.com/alecthomas/types/result" + adminpb "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1" buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" + langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" + "github.com/block/ftl/common/key" "github.com/block/ftl/common/log" "github.com/block/ftl/common/schema" "github.com/block/ftl/internal/buildengine/languageplugin" "github.com/block/ftl/internal/dev" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" + "github.com/block/ftl/internal/realm" + "github.com/block/ftl/internal/schema/schemaeventsource" "github.com/block/ftl/internal/watch" "github.com/puzpuzpuz/xsync/v3" "golang.org/x/sync/errgroup" @@ -57,20 +65,23 @@ type moduleMetaV2 struct { configDefaults moduleconfig.CustomDefaults state *buildenginepb.EngineEvent deployAfterBuild bool + pendingDeploy *pendingModule } type EngineV2 struct { - projectConfig projectconfig.Config - parallelism int - moduleDirs []string - moduleMetas *xsync.MapOf[string, *moduleMetaV2] - builtModules *xsync.MapOf[string, *schema.Module] - stateChanges *pubsub.Topic[StateChange] - buildEnv []string - os string - arch string - devMode bool - + adminClient AdminClient + projectConfig projectconfig.Config + parallelism int + moduleDirs []string + moduleMetas *xsync.MapOf[string, *moduleMetaV2] + builtModules *xsync.MapOf[string, *schema.Module] + stateChanges *pubsub.Topic[StateChange] + buildEnv []string + os string + arch string + devMode bool + externalRealms []*schema.Realm + schemaSource *schemaeventsource.EventSource // TODO: Can we remove this? devModeEndpointUpdates chan dev.LocalEndpoint } @@ -89,6 +100,14 @@ func ParallelismV2(n int) EngineV2Option { } } +// WithDevMode sets the engine to dev mode. +func WithDevModeV2(updates chan dev.LocalEndpoint) EngineV2Option { + return func(o *EngineV2) { + o.devModeEndpointUpdates = updates + o.devMode = true + } +} + // StateChange represents a state change event for a specific module. type StateChange struct { Module string @@ -102,27 +121,50 @@ type StateChange struct { // - Creating the module instance func NewV2( ctx context.Context, - _ interface{}, // schemaSource placeholder + adminClient AdminClient, + schemaSource *schemaeventsource.EventSource, projectConfig projectconfig.Config, moduleDirs []string, _ bool, // logChanges placeholder options ...EngineV2Option, + ) (*EngineV2, error) { logger := log.FromContext(ctx).Scope("engine") ctx = log.ContextWithLogger(ctx, logger) e := &EngineV2{ - projectConfig: projectConfig, - parallelism: runtime.NumCPU(), - moduleDirs: moduleDirs, - moduleMetas: xsync.NewMapOf[string, *moduleMetaV2](), - builtModules: xsync.NewMapOf[string, *schema.Module](), - stateChanges: pubsub.New[StateChange](), + adminClient: adminClient, + projectConfig: projectConfig, + parallelism: runtime.NumCPU(), + moduleDirs: moduleDirs, + moduleMetas: xsync.NewMapOf[string, *moduleMetaV2](), + builtModules: xsync.NewMapOf[string, *schema.Module](), + stateChanges: pubsub.New[StateChange](), + externalRealms: []*schema.Realm{}, + schemaSource: schemaSource, + arch: runtime.GOARCH, // Default to the local env, we attempt to read these from the cluster later + os: runtime.GOOS, } for _, option := range options { option(e) } + // Ensure schema sync at startup if we have an admin client + if e.adminClient != nil { + info, err := adminClient.ClusterInfo(ctx, connect.NewRequest(&adminpb.ClusterInfoRequest{})) + if err != nil { + log.FromContext(ctx).Debugf("failed to get cluster info: %s", err) + } else { + e.os = info.Msg.Os + e.arch = info.Msg.Arch + } + logger.Infof("Waiting for initial schema sync") + if !e.schemaSource.WaitForInitialSync(ctx) { + return nil, errors.Errorf("timed out waiting for initial schema sync from server") + } + logger.Infof("Initial schema sync complete") + } + // Discover modules configs, err := watch.DiscoverModules(ctx, moduleDirs) if err != nil { @@ -136,8 +178,12 @@ func NewV2( logger.Infof("Initializing modules") + jvm := false wg := &errgroup.Group{} for _, config := range configs { + if config.Language == "java" || config.Language == "kotlin" { + jvm = true + } wg.Go(func() error { return e.initModuleMeta(ctx, config) }) @@ -147,6 +193,43 @@ func NewV2( return nil, errors.WithStack(err) } + if jvm { + // Huge hack that is just for development + // In release builds this is a noop + // This makes sure the JVM jars are up to date when running from source + buildRequiredJARS(ctx) + } + + // Initialize builtModules with builtins + builtModules := map[string]*schema.Module{ + "builtin": schema.Builtins(), + } + + // Create metasMap from moduleMetas + metasMap := map[string]moduleMeta{} + e.moduleMetas.Range(func(name string, meta *moduleMetaV2) bool { + metasMap[name] = moduleMeta{ + module: meta.module, + plugin: meta.plugin, + } + return true + }) + + // Generate stubs for builtins after all modules are initialized + logger.Infof("Generating stubs for builtins module") + err = GenerateStubs(ctx, e.projectConfig.Root(), maps.Values(builtModules), metasMap) + if err != nil { + return nil, errors.WithStack(err) + } + + for name, cfg := range projectConfig.ExternalRealms { + realm, err := realm.GetExternalRealm(ctx, e.projectConfig.ExternalRealmPath(), name, cfg) + if err != nil { + return nil, errors.Wrapf(err, "failed to read external realm %s", name) + } + e.externalRealms = append(e.externalRealms, realm) + } + go e.processChanges(ctx) return e, nil @@ -194,9 +277,17 @@ func (e *EngineV2) processChanges(ctx context.Context) { switch stateChange.Event.Event.(type) { case *buildenginepb.EngineEvent_ModuleBuildWaiting: logger.Infof("Build waiting...") + proto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) + if err != nil { + logger.Errorf(err, "failed to marshal module config") + return err + } e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{}, + ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ + Config: proto, + }, }, }) case *buildenginepb.EngineEvent_ModuleBuildStarted: @@ -204,21 +295,25 @@ func (e *EngineV2) processChanges(ctx context.Context) { err := e.buildModule(ctx, meta) if err != nil { - return err + return e.handleModuleError(ctx, stateChange, err) } case *buildenginepb.EngineEvent_ModuleBuildFailed: logger.Infof("Build failed") case *buildenginepb.EngineEvent_ModuleBuildSuccess: logger.Infof("Build success") + dependentModules := e.getDependentModuleNames(stateChange.Module) for _, dependentModule := range dependentModules { - e.tryStartBuild(ctx, dependentModule) + e.startBuild(ctx, dependentModule) } if meta.deployAfterBuild { e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleDeployStarted{ - ModuleDeployStarted: &buildenginepb.ModuleDeployStarted{}, + ModuleDeployStarted: &buildenginepb.ModuleDeployStarted{ + Module: meta.module.Config.Module, + }, }, }) } @@ -227,12 +322,10 @@ func (e *EngineV2) processChanges(ctx context.Context) { logger.Infof("Deploy waiting...") case *buildenginepb.EngineEvent_ModuleDeployStarted: logger.Infof("Deploying...") - time.Sleep(1 * time.Second) - e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeploySuccess{ - ModuleDeploySuccess: &buildenginepb.ModuleDeploySuccess{}, - }, - }) + err := e.deployModule(ctx, meta) + if err != nil { + return e.handleModuleError(ctx, stateChange, err) + } case *buildenginepb.EngineEvent_ModuleDeployFailed: logger.Infof("Deploy failed") case *buildenginepb.EngineEvent_ModuleDeploySuccess: @@ -250,17 +343,31 @@ func (e *EngineV2) buildModule(ctx context.Context, meta *moduleMetaV2) error { logger := log.FromContext(ctx) logger.Infof("Building module %s", meta.module.Config.Module) + // Ensure all dependencies are built and their stubs are generated + dependencies := meta.module.Dependencies(AlwaysIncludeBuiltin) + for _, dep := range dependencies { + if dep == "builtin" { + continue + } + depMeta, ok := e.moduleMetas.Load(dep) + if !ok { + return errors.Errorf("dependency %s not found", dep) + } + if !isBuildComplete(depMeta.state) { + logger.Debugf("Dependency %s not built", dep) + return nil + } + } + builtModules := make([]*schema.Module, 0) e.builtModules.Range(func(key string, module *schema.Module) bool { builtModules = append(builtModules, module) return true }) - logger.Infof("Built modules: %v", builtModules) - sch := &schema.Schema{Realms: []*schema.Realm{{Modules: builtModules}}} //nolint:exptostd - moduleSchema, _, _, err := build(ctx, e.projectConfig, meta.module, meta.plugin, languageplugin.BuildContext{ + moduleSchema, tmpDeployDir, deployPaths, err := build(ctx, e.projectConfig, meta.module, meta.plugin, languageplugin.BuildContext{ Config: meta.module.Config, Schema: sch, Dependencies: meta.module.Dependencies(Raw), @@ -274,14 +381,87 @@ func (e *EngineV2) buildModule(ctx context.Context, meta *moduleMetaV2) error { } logger.Infof("Built module %s", meta.module.Config.Module) + // Generate stubs for the successfully built module + metasMap := map[string]moduleMeta{} + e.moduleMetas.Range(func(name string, meta *moduleMetaV2) bool { + metasMap[name] = moduleMeta{ + module: meta.module, + plugin: meta.plugin, + } + return true + }) + err = GenerateStubs(ctx, e.projectConfig.Root(), []*schema.Module{moduleSchema}, metasMap) + if err != nil { + logger.Errorf(err, "Failed to generate stubs for module %s", meta.module.Config.Module) + return err + } + + configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) + if err != nil { + logger.Errorf(err, "Failed to marshal module config") + return err + } + + pendingDeploy := newPendingModule(meta.module, tmpDeployDir, deployPaths, moduleSchema) + meta.pendingDeploy = pendingDeploy + e.moduleMetas.Store(meta.module.Config.Module, meta) e.builtModules.Store(meta.module.Config.Module, moduleSchema) - e.updateModuleState(ctx, meta.module.Config.Module, &buildenginepb.EngineEvent{ + + return e.updateModuleState(ctx, meta.module.Config.Module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ - ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{}, + ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{ + Config: configProto, + }, }, }) +} - return nil +func (e *EngineV2) deployModule(ctx context.Context, meta *moduleMetaV2) error { + logger := log.FromContext(ctx) + logger.Infof("Deploying module %s", meta.module.Config.Module) + + sch, ok := e.builtModules.Load(meta.module.Config.Module) + if !ok { + return errors.Errorf("module %s not found", meta.module.Config.Module) + } + + if sch.Runtime == nil { + sch.Runtime = &schema.ModuleRuntime{ + Base: schema.ModuleRuntimeBase{ + CreateTime: time.Now(), + }, + } + } + sch.Runtime.Base.Language = meta.module.Config.Language + + // Upload artifacts first + if err := uploadArtefacts(ctx, meta.pendingDeploy, e.adminClient); err != nil { + return errors.WithStack(err) + } + + sch.Metadata = meta.pendingDeploy.schema.Metadata + e.builtModules.Store(meta.module.Config.Module, sch) + + // Deploy the module + keyChan := make(chan result.Result[key.Changeset], 1) + if err := deploy(ctx, e.projectConfig.Name, []*schema.Module{sch}, e.adminClient, keyChan, e.externalRealms); err != nil { + return errors.WithStack(err) + } + + // Handle deployment completion + if key, ok := (<-keyChan).Get(); ok { + logger.Debugf("Created changeset %s for module %s", key, meta.module.Config.Module) + } + + return e.updateModuleState(ctx, meta.module.Config.Module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeploySuccess{ + ModuleDeploySuccess: &buildenginepb.ModuleDeploySuccess{ + Module: meta.module.Config.Module, + }, + }, + }) } // updateModuleState updates the state of a module and publishes the change. @@ -301,8 +481,57 @@ func (e *EngineV2) updateModuleState(ctx context.Context, module string, event * return nil } -// Main entry for testing -func (e *EngineV2) BuildV2(ctx context.Context, deployAfterBuild bool) error { +func (e *EngineV2) handleModuleError(ctx context.Context, stateChange StateChange, err error) error { + logger := log.FromContext(ctx).Scope(stateChange.Module) + logger.Errorf(err, "Error in state %T", stateChange.Event.Event) + var event *buildenginepb.EngineEvent + + // You may need to get the config proto for the module + meta, ok := e.moduleMetas.Load(stateChange.Module) + var configProto *langpb.ModuleConfig + if ok { + configProto, err = langpb.ModuleConfigToProto(meta.module.Config.Abs()) + if err != nil { + logger.Errorf(err, "Failed to marshal module config") + return err + } + } + + switch stateChange.Event.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildStarted: + event = &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ + ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ + Config: configProto, + IsAutoRebuild: false, + Errors: &langpb.ErrorList{ + Errors: errorToLangError(err), + }, + }, + }, + } + case *buildenginepb.EngineEvent_ModuleDeployStarted: + event = &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeployFailed{ + ModuleDeployFailed: &buildenginepb.ModuleDeployFailed{ + Module: meta.module.Config.Module, + Errors: &langpb.ErrorList{ + Errors: errorToLangError(err), + }, + }, + }, + } + default: + return err + } + + return e.updateModuleState(ctx, stateChange.Module, event) +} + +// BuildV2 builds all modules and optionally deploys them. +func (e *EngineV2) BuildV2(ctx context.Context, deployAfterBuild bool, waitForDeployOnline bool) error { logger := log.FromContext(ctx).Scope("engine") logger.Infof("Building modules") @@ -322,7 +551,7 @@ func (e *EngineV2) BuildV2(ctx context.Context, deployAfterBuild bool) error { totalModules++ meta.deployAfterBuild = deployAfterBuild e.moduleMetas.Store(key, meta) - e.tryStartBuild(ctx, key) + e.startBuild(ctx, key) return true }) @@ -376,7 +605,7 @@ func (e *EngineV2) BuildV2(ctx context.Context, deployAfterBuild bool) error { } } -func (e *EngineV2) tryStartBuild(ctx context.Context, module string) error { +func (e *EngineV2) startBuild(ctx context.Context, module string) error { logger := log.FromContext(ctx).Scope("engine") meta, ok := e.moduleMetas.Load(module) @@ -395,10 +624,18 @@ func (e *EngineV2) tryStartBuild(ctx context.Context, module string) error { return nil } if !isBuildComplete(meta.state) { - logger.Warnf("Dependency %s not built for %s", dependency, module) + logger.Debugf("Dependency %s not built for %s", dependency, module) + configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) + if err != nil { + logger.Errorf(err, "Failed to marshal module config") + return err + } e.updateModuleState(ctx, module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ - ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{}, + ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ + Config: configProto, + }, }, }) return nil @@ -407,9 +644,17 @@ func (e *EngineV2) tryStartBuild(ctx context.Context, module string) error { logger.Infof("Starting build for %s", module) + configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) + if err != nil { + logger.Errorf(err, "Failed to marshal module config") + return err + } e.updateModuleState(ctx, module, &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{}, + ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ + Config: configProto, + }, }, }) @@ -435,6 +680,7 @@ func (e *EngineV2) initModuleMeta(ctx context.Context, config moduleconfig.Unval plugin: plugin, configDefaults: customDefaults, state: &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), Event: &buildenginepb.EngineEvent_ModuleAdded{ ModuleAdded: &buildenginepb.ModuleAdded{ Module: config.Module, @@ -454,7 +700,7 @@ func (e *EngineV2) initModuleMeta(ctx context.Context, config moduleconfig.Unval e.moduleMetas.Store(config.Module, &meta) e.stateChanges.Publish(StateChange{ Module: config.Module, - Event: &buildenginepb.EngineEvent{Event: &buildenginepb.EngineEvent_ModuleAdded{ModuleAdded: &buildenginepb.ModuleAdded{Module: config.Module}}}, + Event: meta.state, }) return nil From 8906c0ed1f6624e37319d1772a19726706517fcf Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 4 Jun 2025 12:03:25 -0700 Subject: [PATCH 3/4] feat: add back terminal status updates --- cmd/ftl/app.go | 7 ++++++ internal/buildengine/engine_v2.go | 20 ++-------------- internal/buildengine/terminal.go | 38 +++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/cmd/ftl/app.go b/cmd/ftl/app.go index acb5f9d1bb..68b727862a 100644 --- a/cmd/ftl/app.go +++ b/cmd/ftl/app.go @@ -25,6 +25,7 @@ import ( "github.com/block/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/block/ftl/common/log" "github.com/block/ftl/internal" + "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/buildengine/languageplugin" "github.com/block/ftl/internal/config" "github.com/block/ftl/internal/editor" @@ -278,6 +279,12 @@ func makeBindContext(logger *log.Logger, cancel context.CancelCauseFunc, csm *cu return schemaeventsource.New(ctx, "cli", adminClient), nil }) kctx.FatalIfErrorf(err) + + err = kctx.BindToProvider(func(adminClient adminpbconnect.AdminServiceClient) (buildengine.AdminClient, error) { + return adminClient, nil + }) + kctx.FatalIfErrorf(err) + kongcompletion.Register(kctx.Kong, kongcompletion.WithPredictors(terminal.Predictors(func() schemaeventsource.View { ac := rpc.Dial(adminpbconnect.NewAdminServiceClient, cli.AdminEndpoint.String(), log.Error) return schemaeventsource.New(ctx, "terminal-predicators", ac).ViewOnly() diff --git a/internal/buildengine/engine_v2.go b/internal/buildengine/engine_v2.go index 1ad8755090..5fbbedde70 100644 --- a/internal/buildengine/engine_v2.go +++ b/internal/buildengine/engine_v2.go @@ -149,6 +149,8 @@ func NewV2( option(e) } + updateTerminalWithEngineEventsV2(ctx, e.stateChanges) + // Ensure schema sync at startup if we have an admin client if e.adminClient != nil { info, err := adminClient.ClusterInfo(ctx, connect.NewRequest(&adminpb.ClusterInfoRequest{})) @@ -158,11 +160,6 @@ func NewV2( e.os = info.Msg.Os e.arch = info.Msg.Arch } - logger.Infof("Waiting for initial schema sync") - if !e.schemaSource.WaitForInitialSync(ctx) { - return nil, errors.Errorf("timed out waiting for initial schema sync from server") - } - logger.Infof("Initial schema sync complete") } // Discover modules @@ -277,19 +274,6 @@ func (e *EngineV2) processChanges(ctx context.Context) { switch stateChange.Event.Event.(type) { case *buildenginepb.EngineEvent_ModuleBuildWaiting: logger.Infof("Build waiting...") - proto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) - if err != nil { - logger.Errorf(err, "failed to marshal module config") - return err - } - e.updateModuleState(ctx, stateChange.Module, &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ - Config: proto, - }, - }, - }) case *buildenginepb.EngineEvent_ModuleBuildStarted: logger.Infof("Building...") diff --git a/internal/buildengine/terminal.go b/internal/buildengine/terminal.go index b496aeda34..413e9b343d 100644 --- a/internal/buildengine/terminal.go +++ b/internal/buildengine/terminal.go @@ -47,3 +47,41 @@ func updateTerminalWithEngineEvents(ctx context.Context, topic *pubsub.Topic[*bu } }() } + +func updateTerminalWithEngineEventsV2(ctx context.Context, topic *pubsub.Topic[StateChange]) { + events := make(chan StateChange, 64) + topic.Subscribe(events) + + go func() { + defer topic.Unsubscribe(events) + for event := range channels.IterContext(ctx, events) { + switch evt := event.Event.Event.(type) { + case *buildenginepb.EngineEvent_EngineStarted: + case *buildenginepb.EngineEvent_EngineEnded: + + case *buildenginepb.EngineEvent_ModuleAdded: + terminal.UpdateModuleState(ctx, evt.ModuleAdded.Module, terminal.BuildStateWaiting) + case *buildenginepb.EngineEvent_ModuleRemoved: + terminal.UpdateModuleState(ctx, evt.ModuleRemoved.Module, terminal.BuildStateTerminated) + + case *buildenginepb.EngineEvent_ModuleBuildWaiting: + terminal.UpdateModuleState(ctx, evt.ModuleBuildWaiting.Config.Name, terminal.BuildStateWaiting) + case *buildenginepb.EngineEvent_ModuleBuildStarted: + terminal.UpdateModuleState(ctx, evt.ModuleBuildStarted.Config.Name, terminal.BuildStateBuilding) + case *buildenginepb.EngineEvent_ModuleBuildSuccess: + terminal.UpdateModuleState(ctx, evt.ModuleBuildSuccess.Config.Name, terminal.BuildStateBuilt) + case *buildenginepb.EngineEvent_ModuleBuildFailed: + terminal.UpdateModuleState(ctx, evt.ModuleBuildFailed.Config.Name, terminal.BuildStateFailed) + + case *buildenginepb.EngineEvent_ModuleDeployWaiting: + terminal.UpdateModuleState(ctx, evt.ModuleDeployWaiting.Module, terminal.BuildStateDeployWaiting) + case *buildenginepb.EngineEvent_ModuleDeployStarted: + terminal.UpdateModuleState(ctx, evt.ModuleDeployStarted.Module, terminal.BuildStateDeploying) + case *buildenginepb.EngineEvent_ModuleDeploySuccess: + terminal.UpdateModuleState(ctx, evt.ModuleDeploySuccess.Module, terminal.BuildStateDeployed) + case *buildenginepb.EngineEvent_ModuleDeployFailed: + terminal.UpdateModuleState(ctx, evt.ModuleDeployFailed.Module, terminal.BuildStateFailed) + } + } + }() +} From e0e6761cf2830ea2d29ed5214f1f98e00e8b7277 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 4 Jun 2025 22:52:38 +0000 Subject: [PATCH 4/4] chore(autofmt): Automated formatting --- internal/buildengine/engine_v2.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/buildengine/engine_v2.go b/internal/buildengine/engine_v2.go index 5fbbedde70..306cfdcb72 100644 --- a/internal/buildengine/engine_v2.go +++ b/internal/buildengine/engine_v2.go @@ -6,12 +6,14 @@ import ( "time" "connectrpc.com/connect" - "golang.org/x/exp/maps" - "google.golang.org/protobuf/types/known/timestamppb" - errors "github.com/alecthomas/errors" "github.com/alecthomas/types/pubsub" "github.com/alecthomas/types/result" + "github.com/puzpuzpuz/xsync/v3" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/types/known/timestamppb" + adminpb "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1" buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" @@ -25,8 +27,6 @@ import ( "github.com/block/ftl/internal/realm" "github.com/block/ftl/internal/schema/schemaeventsource" "github.com/block/ftl/internal/watch" - "github.com/puzpuzpuz/xsync/v3" - "golang.org/x/sync/errgroup" ) // isBuildComplete returns true if the module is considered complete based on the deployAfterBuild flag