diff --git a/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.pb.go b/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.pb.go index 80cdfe7c15..3bb3779bee 100644 --- a/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.pb.go +++ b/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.pb.go @@ -246,7 +246,6 @@ func (x *ModuleBuildWaiting) GetConfig() *v1.ModuleConfig { type ModuleBuildStarted struct { state protoimpl.MessageState `protogen:"open.v1"` Config *v1.ModuleConfig `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` - IsAutoRebuild bool `protobuf:"varint,2,opt,name=is_auto_rebuild,json=isAutoRebuild,proto3" json:"is_auto_rebuild,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -288,19 +287,11 @@ func (x *ModuleBuildStarted) GetConfig() *v1.ModuleConfig { return nil } -func (x *ModuleBuildStarted) GetIsAutoRebuild() bool { - if x != nil { - return x.IsAutoRebuild - } - return false -} - // ModuleBuildFailed is published for any build failures. type ModuleBuildFailed struct { state protoimpl.MessageState `protogen:"open.v1"` Config *v1.ModuleConfig `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` Errors *v1.ErrorList `protobuf:"bytes,2,opt,name=errors,proto3" json:"errors,omitempty"` - IsAutoRebuild bool `protobuf:"varint,3,opt,name=is_auto_rebuild,json=isAutoRebuild,proto3" json:"is_auto_rebuild,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -349,18 +340,10 @@ func (x *ModuleBuildFailed) GetErrors() *v1.ErrorList { return nil } -func (x *ModuleBuildFailed) GetIsAutoRebuild() bool { - if x != nil { - return x.IsAutoRebuild - } - return false -} - // ModuleBuildSuccess is published when all modules have been built successfully built. type ModuleBuildSuccess struct { state protoimpl.MessageState `protogen:"open.v1"` Config *v1.ModuleConfig `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` - IsAutoRebuild bool `protobuf:"varint,2,opt,name=is_auto_rebuild,json=isAutoRebuild,proto3" json:"is_auto_rebuild,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -402,13 +385,6 @@ func (x *ModuleBuildSuccess) GetConfig() *v1.ModuleConfig { return nil } -func (x *ModuleBuildSuccess) GetIsAutoRebuild() bool { - if x != nil { - return x.IsAutoRebuild - } - return false -} - // ModuleDeployStarted is published when a deploy has been queued type ModuleDeployWaiting struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1068,17 +1044,14 @@ const file_xyz_block_ftl_buildengine_v1_buildengine_proto_rawDesc = "" + "\rModuleRemoved\x12\x16\n" + "\x06module\x18\x01 \x01(\tR\x06module\"U\n" + "\x12ModuleBuildWaiting\x12?\n" + - "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\"}\n" + + "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\"U\n" + "\x12ModuleBuildStarted\x12?\n" + - "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\x12&\n" + - "\x0fis_auto_rebuild\x18\x02 \x01(\bR\risAutoRebuild\"\xba\x01\n" + + "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\"\x92\x01\n" + "\x11ModuleBuildFailed\x12?\n" + "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\x12<\n" + - "\x06errors\x18\x02 \x01(\v2$.xyz.block.ftl.language.v1.ErrorListR\x06errors\x12&\n" + - "\x0fis_auto_rebuild\x18\x03 \x01(\bR\risAutoRebuild\"}\n" + + "\x06errors\x18\x02 \x01(\v2$.xyz.block.ftl.language.v1.ErrorListR\x06errors\"U\n" + "\x12ModuleBuildSuccess\x12?\n" + - "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\x12&\n" + - "\x0fis_auto_rebuild\x18\x02 \x01(\bR\risAutoRebuild\"-\n" + + "\x06config\x18\x01 \x01(\v2'.xyz.block.ftl.language.v1.ModuleConfigR\x06config\"-\n" + "\x13ModuleDeployWaiting\x12\x16\n" + "\x06module\x18\x01 \x01(\tR\x06module\"-\n" + "\x13ModuleDeployStarted\x12\x16\n" + diff --git a/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.proto b/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.proto index 4c7c58a963..d0fdf15ad8 100644 --- a/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.proto +++ b/backend/protos/xyz/block/ftl/buildengine/v1/buildengine.proto @@ -40,20 +40,17 @@ message ModuleBuildWaiting { // ModuleBuildStarted is published when a build has started for a module. message ModuleBuildStarted { xyz.block.ftl.language.v1.ModuleConfig config = 1; - bool is_auto_rebuild = 2; } // ModuleBuildFailed is published for any build failures. message ModuleBuildFailed { xyz.block.ftl.language.v1.ModuleConfig config = 1; xyz.block.ftl.language.v1.ErrorList errors = 2; - bool is_auto_rebuild = 3; } // ModuleBuildSuccess is published when all modules have been built successfully built. message ModuleBuildSuccess { xyz.block.ftl.language.v1.ModuleConfig config = 1; - bool is_auto_rebuild = 2; } // ModuleDeployStarted is published when a deploy has been queued diff --git a/cmd/ftl/cmd_build.go b/cmd/ftl/cmd_build.go index dd1ba61bdf..5b2f78a612 100644 --- a/cmd/ftl/cmd_build.go +++ b/cmd/ftl/cmd_build.go @@ -6,7 +6,6 @@ import ( errors "github.com/alecthomas/errors" "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/log" "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" @@ -24,7 +23,7 @@ func (b *buildCmd) Run( schemaSource *schemaeventsource.EventSource, projConfig projectconfig.Config, ) error { - logger := log.FromContext(ctx) + // logger := log.FromContext(ctx) if len(b.Dirs) == 0 { b.Dirs = projConfig.AbsModuleDirs() } @@ -38,7 +37,6 @@ func (b *buildCmd) Run( engine, err := buildengine.New( ctx, adminClient, - schemaSource, projConfig, b.Dirs, false, @@ -48,11 +46,11 @@ func (b *buildCmd) Run( if err != nil { return errors.WithStack(err) } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to build") - return nil - } - if err := engine.Build(ctx); err != nil { + // if len(engine.Modules()) == 0 { + // logger.Warnf("No modules were found to build") + // return nil + // } + if err := engine.Build(ctx, schemaSource); err != nil { return errors.Wrap(err, "build failed") } return nil diff --git a/cmd/ftl/cmd_buildimage.go b/cmd/ftl/cmd_buildimage.go index ea973c3c6b..65f2829446 100644 --- a/cmd/ftl/cmd_buildimage.go +++ b/cmd/ftl/cmd_buildimage.go @@ -2,17 +2,8 @@ package main import ( "context" - "os" - "path/filepath" - errors "github.com/alecthomas/errors" - - "github.com/block/ftl" "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/key" - "github.com/block/ftl/common/log" - "github.com/block/ftl/common/schema" - "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/oci" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" @@ -36,94 +27,96 @@ func (b *buildImageCmd) Run( schemaSource *schemaeventsource.EventSource, projConfig projectconfig.Config, ) error { - logger := log.FromContext(ctx) - if len(b.Dirs) == 0 { - b.Dirs = projConfig.AbsModuleDirs() - } - if len(b.Dirs) == 0 { - return errors.WithStack(errors.New("no directories specified")) - } + // TODO: redo all this + + // logger := log.FromContext(ctx) + // if len(b.Dirs) == 0 { + // b.Dirs = projConfig.AbsModuleDirs() + // } + // if len(b.Dirs) == 0 { + // return errors.WithStack(errors.New("no directories specified")) + // } - // Cancel build engine context to ensure all language plugins are killed. - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(errors.Wrap(context.Canceled, "build stopped")) - engine, err := buildengine.New( - ctx, - adminClient, - schemaSource, - projConfig, - b.Dirs, - false, - buildengine.BuildEnv(b.BuildEnv), - buildengine.Parallelism(b.Parallelism), - ) - if err != nil { - return errors.WithStack(err) - } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to build") - return nil - } - artefactService, err := oci.NewArtefactService(ctx, b.ArtefactConfig) - if err != nil { - return errors.Wrapf(err, "failed to init artefact service") - } - imageService, err := oci.NewImageService(ctx, artefactService, &b.ImageConfig) - if err != nil { - return errors.Wrapf(err, "failed to init OCI") - } - if err := engine.BuildWithCallback(ctx, func(ctx context.Context, module buildengine.Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error { - artifacts := []*schema.MetadataArtefact{} + // // Cancel build engine context to ensure all language plugins are killed. + // ctx, cancel := context.WithCancelCause(ctx) + // defer cancel(errors.Wrap(context.Canceled, "build stopped")) + // engine, err := buildengine.New( + // ctx, + // adminClient, + // schemaSource, + // projConfig, + // b.Dirs, + // false, + // buildengine.BuildEnv(b.BuildEnv), + // buildengine.Parallelism(b.Parallelism), + // ) + // if err != nil { + // return errors.WithStack(err) + // } + // if len(engine.Modules()) == 0 { + // logger.Warnf("No modules were found to build") + // return nil + // } + // artefactService, err := oci.NewArtefactService(ctx, b.ArtefactConfig) + // if err != nil { + // return errors.Wrapf(err, "failed to init artefact service") + // } + // imageService, err := oci.NewImageService(ctx, artefactService, &b.ImageConfig) + // if err != nil { + // return errors.Wrapf(err, "failed to init OCI") + // } + // if err := engine.BuildWithCallback(ctx, func(ctx context.Context, module buildengine.Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error { + // artifacts := []*schema.MetadataArtefact{} - for _, i := range deployPaths { - s, err := os.Stat(i) - if err != nil { - return errors.Wrapf(err, "failed to stat file") - } + // for _, i := range deployPaths { + // s, err := os.Stat(i) + // if err != nil { + // return errors.Wrapf(err, "failed to stat file") + // } - path, err := filepath.Rel(tmpDeployDir, i) - if err != nil { - return errors.Wrapf(err, "failed to resolve file") - } - executable := s.Mode().Perm()&0111 != 0 - artifacts = append(artifacts, &schema.MetadataArtefact{Path: path, Executable: executable}) - } - var image string - if b.RunnerImage != "" { - image = b.RunnerImage - } else { - image = "ftl0/ftl-runner" - if moduleSch.ModRuntime().Base.Image != "" { - image = moduleSch.ModRuntime().Base.Image - } - image += ":" - if ftl.IsRelease(ftl.Version) && ftl.Version == ftl.BaseVersion(ftl.Version) { - image += "v" - image += ftl.Version - } else { - image += "latest" - } - } - tgt := string(b.ArtefactConfig.Repository) - tgt += ":" - tgt += b.Tag - targets := []oci.ImageTarget{} - if !b.SkipLocalDaemon { - targets = append(targets, oci.WithLocalDeamon()) - } - if b.Push { - targets = append(targets, oci.WithRemotePush()) - } - // TODO: we need to properly sync the deployment with the actual deployment key - // this is just a hack to get the module and realm to the runner - deployment := key.NewDeploymentKey(projConfig.Name, moduleSch.Name) - err := imageService.BuildOCIImage(ctx, image, tgt, tmpDeployDir, deployment, artifacts, targets...) - if err != nil { - return errors.Wrapf(err, "failed to build image") - } - return nil - }); err != nil { - return errors.Wrap(err, "build failed") - } + // path, err := filepath.Rel(tmpDeployDir, i) + // if err != nil { + // return errors.Wrapf(err, "failed to resolve file") + // } + // executable := s.Mode().Perm()&0111 != 0 + // artifacts = append(artifacts, &schema.MetadataArtefact{Path: path, Executable: executable}) + // } + // var image string + // if b.RunnerImage != "" { + // image = b.RunnerImage + // } else { + // image = "ftl0/ftl-runner" + // if moduleSch.ModRuntime().Base.Image != "" { + // image = moduleSch.ModRuntime().Base.Image + // } + // image += ":" + // if ftl.IsRelease(ftl.Version) && ftl.Version == ftl.BaseVersion(ftl.Version) { + // image += "v" + // image += ftl.Version + // } else { + // image += "latest" + // } + // } + // tgt := string(b.ArtefactConfig.Repository) + // tgt += ":" + // tgt += b.Tag + // targets := []oci.ImageTarget{} + // if !b.SkipLocalDaemon { + // targets = append(targets, oci.WithLocalDeamon()) + // } + // if b.Push { + // targets = append(targets, oci.WithRemotePush()) + // } + // // TODO: we need to properly sync the deployment with the actual deployment key + // // this is just a hack to get the module and realm to the runner + // deployment := key.NewDeploymentKey(projConfig.Name, moduleSch.Name) + // err := imageService.BuildOCIImage(ctx, image, tgt, tmpDeployDir, deployment, artifacts, targets...) + // if err != nil { + // return errors.Wrapf(err, "failed to build image") + // } + // return nil + // }); err != nil { + // return errors.Wrap(err, "build failed") + // } return nil } diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index 4f9a9fa03f..15050df027 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -8,11 +8,9 @@ import ( "github.com/alecthomas/types/optional" "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/log" "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" - "github.com/block/ftl/internal/terminal" ) type deployCmd struct { @@ -28,12 +26,18 @@ func (d *deployCmd) Run( adminClient adminpbconnect.AdminServiceClient, schemaSource *schemaeventsource.EventSource, ) error { - logger := log.FromContext(ctx) + // logger := log.FromContext(ctx) + if len(d.Build.Dirs) == 0 { + d.Build.Dirs = projConfig.AbsModuleDirs() + } + if len(d.Build.Dirs) == 0 { + return errors.WithStack(errors.New("no directories specified")) + } if !schemaSource.WaitForInitialSync(ctx) { return errors.Errorf("timed out waiting for schema sync from server") } - // Cancel build engine context to ensure all language plugins are killed. + if d.Timeout > 0 { var cancel context.CancelFunc //nolint: forbidigo ctx, cancel = context.WithTimeoutCause(ctx, d.Timeout, errors.Errorf("terminating deploy due to timeout of %s", d.Timeout)) @@ -43,23 +47,25 @@ func (d *deployCmd) Run( ctx, cancel = context.WithCancelCause(ctx) defer cancel(errors.Wrap(context.Canceled, "stopping deploy")) } + engine, err := buildengine.New( - ctx, adminClient, schemaSource, projConfig, d.Build.Dirs, true, + ctx, + adminClient, + projConfig, + d.Build.Dirs, + false, buildengine.BuildEnv(d.Build.BuildEnv), buildengine.Parallelism(d.Build.Parallelism), ) if err != nil { return errors.WithStack(err) } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to deploy") - return nil - } - err = engine.BuildAndDeploy(ctx, d.Replicas, !d.NoWait, true) - if err != nil { - return errors.Wrap(err, "failed to deploy") + // if len(engine.Modules()) == 0 { + // logger.Warnf("No modules were found to build") + // return nil + // } + if err := engine.BuildAndDeploy(ctx, schemaSource, d.Replicas); err != nil { + return errors.Wrap(err, "build failed") } - logger.Infof("Deployed modules %v", engine.Modules()) //nolint - terminal.FromContext(ctx).Close() return nil } diff --git a/cmd/ftl/cmd_dev.go b/cmd/ftl/cmd_dev.go index b7d6e0f576..f850581b89 100644 --- a/cmd/ftl/cmd_dev.go +++ b/cmd/ftl/cmd_dev.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "os" "path/filepath" "strings" @@ -105,7 +106,7 @@ func (d *devCmd) Run( devModeEndpointUpdates := make(chan dev.LocalEndpoint, 1) opts := []buildengine.Option{buildengine.Parallelism(d.Build.Parallelism), buildengine.BuildEnv(d.Build.BuildEnv), buildengine.WithDevMode(devModeEndpointUpdates), buildengine.WithStartTime(startTime)} - engine, err := buildengine.New(ctx, deployClient, source, projConfig, d.Build.Dirs, false, opts...) + engine, err := buildengine.New(ctx, deployClient, projConfig, d.Build.Dirs, false, opts...) if err != nil { return errors.WithStack(err) } @@ -122,7 +123,9 @@ func (d *devCmd) Run( } g.Go(func() error { + fmt.Printf("d.ServeCmd.run\n") err := d.ServeCmd.run(ctx, projConfig, cm, sm, optional.Some(controllerReady), true, bindAllocator, devModeEndpointUpdates, []rpc.Service{engine}) + fmt.Printf("d.ServeCmd.run finished: %v\n", err) if err != nil { cancel(errors.Wrap(errors.Join(err, context.Canceled), "dev server failed")) } else { @@ -139,8 +142,7 @@ func (d *devCmd) Run( case <-controllerReady: } starting.Close() - - return errors.WithStack(engine.Dev(ctx, d.Watch)) + return errors.WithStack(engine.Dev(ctx, d.Watch, source)) }) err = g.Wait() diff --git a/cmd/ftl/cmd_schema_save.go b/cmd/ftl/cmd_schema_save.go index 61de0d5a82..14d68cca49 100644 --- a/cmd/ftl/cmd_schema_save.go +++ b/cmd/ftl/cmd_schema_save.go @@ -2,15 +2,8 @@ package main import ( "context" - "os" - "path/filepath" - - errors "github.com/alecthomas/errors" - "google.golang.org/protobuf/encoding/protojson" "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1/adminpbconnect" - "github.com/block/ftl/common/log" - "github.com/block/ftl/internal/buildengine" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/schema/schemaeventsource" ) @@ -28,54 +21,56 @@ func (s *schemaSaveCmd) Run( schemaSource *schemaeventsource.EventSource, projConfig projectconfig.Config, ) error { - logger := log.FromContext(ctx) - if len(s.Dirs) == 0 { - s.Dirs = projConfig.AbsModuleDirs() - } - if len(s.Dirs) == 0 { - return errors.WithStack(errors.New("no directories specified")) - } + // TODO: reimplement this + + // logger := log.FromContext(ctx) + // if len(s.Dirs) == 0 { + // s.Dirs = projConfig.AbsModuleDirs() + // } + // if len(s.Dirs) == 0 { + // return errors.WithStack(errors.New("no directories specified")) + // } - // Cancel build engine context to ensure all language plugins are killed. - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(errors.Wrap(context.Canceled, "build stopped")) - engine, err := buildengine.New( - ctx, - adminClient, - schemaSource, - projConfig, - s.Dirs, - false, - buildengine.BuildEnv(s.BuildEnv), - buildengine.Parallelism(s.Parallelism), - ) - if err != nil { - return errors.WithStack(err) - } - if len(engine.Modules()) == 0 { - logger.Warnf("No modules were found to build") - return nil - } - if err := engine.Build(ctx); err != nil { - return errors.Wrap(err, "build failed") - } - sch, ok := engine.GetSchema() - if !ok { - return errors.New("schema not found") - } - pb := sch.ToProto() - js, err := protojson.Marshal(pb) - if err != nil { - return errors.Wrap(err, "failed to JSON encode schema") - } - dest := s.Dest - if dest == "" { - dest = filepath.Join(projConfig.Root(), "ftl-schema.json") - } - err = os.WriteFile(dest, js, 0600) - if err != nil { - return errors.Wrap(err, "failed to save schema") - } - logger.Infof("Wrote schema to %q", dest) //nolint + // // Cancel build engine context to ensure all language plugins are killed. + // ctx, cancel := context.WithCancelCause(ctx) + // defer cancel(errors.Wrap(context.Canceled, "build stopped")) + // engine, err := buildengine.New( + // ctx, + // adminClient, + // schemaSource, + // projConfig, + // s.Dirs, + // false, + // buildengine.BuildEnv(s.BuildEnv), + // buildengine.Parallelism(s.Parallelism), + // ) + // if err != nil { + // return errors.WithStack(err) + // } + // if len(engine.Modules()) == 0 { + // logger.Warnf("No modules were found to build") + // return nil + // } + // if err := engine.Build(ctx); err != nil { + // return errors.Wrap(err, "build failed") + // } + // sch, ok := engine.GetSchema() + // if !ok { + // return errors.New("schema not found") + // } + // pb := sch.ToProto() + // js, err := protojson.Marshal(pb) + // if err != nil { + // return errors.Wrap(err, "failed to JSON encode schema") + // } + // dest := s.Dest + // if dest == "" { + // dest = filepath.Join(projConfig.Root(), "ftl-schema.json") + // } + // err = os.WriteFile(dest, js, 0600) + // if err != nil { + // return errors.Wrap(err, "failed to save schema") + // } + // logger.Infof("Wrote schema to %q", dest) //nolint return nil } diff --git a/common/schema/moduleref.go b/common/schema/moduleref.go new file mode 100644 index 0000000000..c2ab837ace --- /dev/null +++ b/common/schema/moduleref.go @@ -0,0 +1,8 @@ +package schema + +// ModuleRefKey is a map key for a module in a realm. +// TODO: remove. Assume internal for now +type ModuleRefKey struct { + Realm string `parser:"(@Ident '.')?"` + Module string `parser:"@Ident"` +} diff --git a/common/schema/validate.go b/common/schema/validate.go index aef9ef7081..7a98c28224 100644 --- a/common/schema/validate.go +++ b/common/schema/validate.go @@ -256,7 +256,7 @@ func ValidateModuleInRealm(realm *Realm, m optional.Option[*Module]) (*Realm, er for _, call := range md.Calls { resolved := scopes.Resolve(*call) if resolved == nil { - merr = append(merr, errorf(call, "unknown call %q", call)) + merr = append(merr, errorf(call, "unknown call %q: %s", call, realm)) continue } verb, ok := resolved.Symbol.(*Verb) diff --git a/frontend/console/src/features/engine/engine.utils.ts b/frontend/console/src/features/engine/engine.utils.ts index ff3e2e5596..03aeff0c37 100644 --- a/frontend/console/src/features/engine/engine.utils.ts +++ b/frontend/console/src/features/engine/engine.utils.ts @@ -40,7 +40,7 @@ export const getEventText = (event: EngineEvent | undefined): string => { case 'moduleBuildWaiting': return 'Build waiting' case 'moduleBuildStarted': - return `Build started${event.event.value.isAutoRebuild ? ' (auto rebuild)' : ''}` + return 'Build started' case 'moduleBuildFailed': return 'Build failed' case 'moduleBuildSuccess': diff --git a/frontend/console/src/protos/xyz/block/ftl/buildengine/v1/buildengine_pb.ts b/frontend/console/src/protos/xyz/block/ftl/buildengine/v1/buildengine_pb.ts index 9d54265187..1481b4ece0 100644 --- a/frontend/console/src/protos/xyz/block/ftl/buildengine/v1/buildengine_pb.ts +++ b/frontend/console/src/protos/xyz/block/ftl/buildengine/v1/buildengine_pb.ts @@ -257,11 +257,6 @@ export class ModuleBuildStarted extends Message { */ config?: ModuleConfig; - /** - * @generated from field: bool is_auto_rebuild = 2; - */ - isAutoRebuild = false; - constructor(data?: PartialMessage) { super(); proto3.util.initPartial(data, this); @@ -271,7 +266,6 @@ export class ModuleBuildStarted extends Message { static readonly typeName = "xyz.block.ftl.buildengine.v1.ModuleBuildStarted"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ { no: 1, name: "config", kind: "message", T: ModuleConfig }, - { no: 2, name: "is_auto_rebuild", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): ModuleBuildStarted { @@ -307,11 +301,6 @@ export class ModuleBuildFailed extends Message { */ errors?: ErrorList; - /** - * @generated from field: bool is_auto_rebuild = 3; - */ - isAutoRebuild = false; - constructor(data?: PartialMessage) { super(); proto3.util.initPartial(data, this); @@ -322,7 +311,6 @@ export class ModuleBuildFailed extends Message { static readonly fields: FieldList = proto3.util.newFieldList(() => [ { no: 1, name: "config", kind: "message", T: ModuleConfig }, { no: 2, name: "errors", kind: "message", T: ErrorList }, - { no: 3, name: "is_auto_rebuild", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): ModuleBuildFailed { @@ -353,11 +341,6 @@ export class ModuleBuildSuccess extends Message { */ config?: ModuleConfig; - /** - * @generated from field: bool is_auto_rebuild = 2; - */ - isAutoRebuild = false; - constructor(data?: PartialMessage) { super(); proto3.util.initPartial(data, this); @@ -367,7 +350,6 @@ export class ModuleBuildSuccess extends Message { static readonly typeName = "xyz.block.ftl.buildengine.v1.ModuleBuildSuccess"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ { no: 1, name: "config", kind: "message", T: ModuleConfig }, - { no: 2, name: "is_auto_rebuild", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): ModuleBuildSuccess { diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index 09fe655d75..624eea9908 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -21,6 +21,7 @@ import ( "github.com/block/ftl/internal/exec" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/sql" + "github.com/block/ftl/internal/watch" ) const ( @@ -30,29 +31,60 @@ const ( var errInvalidateDependencies = errors.New("dependencies need to be updated") var errSQLError = errors.New("failed to add queries to schema") -// Build a module in the given directory given the schema and module config. -// -// Plugins must use a lock file to ensure that only one build is running at a time. -// -// Returns invalidateDependenciesError if the build failed due to a change in dependencies. -func build(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { - logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build") +type transactionProviderFunc func() watch.ModifyFilesTransaction +type buildFunc func(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint, fileTransaction watch.ModifyFilesTransaction, outChan chan internalEvent) + +func buildModuleAndPublish(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint, fileTransaction watch.ModifyFilesTransaction, outChan chan internalEvent) { + moduleSchema, tmpDeployDir, deployPaths, err := buildModule(ctx, projectConfig, m, plugin, bctx, devMode, devModeEndpoints, fileTransaction) + outChan <- moduleBuildEndedEvent{ + config: bctx.Config, + moduleSchema: moduleSchema, + tmpDeployDir: tmpDeployDir, + deployPaths: deployPaths, + err: err, + } +} + +func buildModule(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint, fileTransaction watch.ModifyFilesTransaction) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { + logger := log.FromContext(ctx).Module(m.Config.Module).Scope("build") ctx = log.ContextWithLogger(ctx, logger) - err = sql.AddDatabaseDeclsToSchema(ctx, projectConfig.Root(), bctx.Config.Abs(), bctx.Schema) + if err = fileTransaction.Begin(); err != nil { + return nil, "", nil, errors.WithStack(errors.Wrap(err, "failed to begin file transaction")) + } + defer func() { + if transactionErr := fileTransaction.End(); err != nil { + if err == nil { + moduleSchema, tmpDeployDir, deployPaths, err = nil, "", nil, transactionErr + } + } + }() + + // TODO: input enough info to know if sql files have changed + err = sql.AddDatabaseDeclsToSchema(ctx, projectConfig.Root(), m.Config.Abs(), bctx.Schema) if err != nil { return nil, "", nil, errors.WithStack(errors.Join(errSQLError, err)) } - stubsRoot := stubsLanguageDir(projectConfig.Root(), bctx.Config.Language) - moduleSchema, tmpDeployDir, deployPaths, err = handleBuildResult(ctx, projectConfig, m, result.From(plugin.Build(ctx, projectConfig, stubsRoot, bctx, devMode)), devMode, devModeEndpoints, optional.Some(bctx.Schema)) + + stubsRoot := stubsLanguageDir(projectConfig.Root(), m.Config.Language) + moduleSchema, tmpDeployDir, deployPaths, err = handleBuildResult(ctx, projectConfig, m, fileTransaction, result.From(plugin.Build(ctx, projectConfig, stubsRoot, bctx, devMode)), devMode, devModeEndpoints, optional.Some(bctx.Schema)) if err != nil { return nil, "", nil, errors.WithStack(err) } return moduleSchema, tmpDeployDir, deployPaths, nil } +// // Build a module in the given directory given the schema and module config. +// // +// // Plugins must use a lock file to ensure that only one build is running at a time. +// // +// // Returns invalidateDependenciesError if the build failed due to a change in dependencies. +// func build(ctx context.Context, projectConfig projectconfig.Config, m Module, plugin *languageplugin.LanguagePlugin, fileTransaction watch.ModifyFilesTransaction, bctx languageplugin.BuildContext, devMode bool, devModeEndpoints chan dev.LocalEndpoint) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { + +// } + // handleBuildResult processes the result of a build -func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, m Module, eitherResult result.Result[languageplugin.BuildResult], devMode bool, devModeEndpoints chan dev.LocalEndpoint, schemaOpt optional.Option[*schema.Schema]) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { +func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, m Module, fileTransaction watch.ModifyFilesTransaction, eitherResult result.Result[languageplugin.BuildResult], devMode bool, devModeEndpoints chan dev.LocalEndpoint, schemaOpt optional.Option[*schema.Schema]) (moduleSchema *schema.Module, tmpDeployDir string, deployPaths []string, err error) { logger := log.FromContext(ctx) config := m.Config.Abs() @@ -61,6 +93,10 @@ func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, return nil, "", nil, errors.Wrap(err, "failed to build module") } + if err := fileTransaction.ModifiedFiles(result.ModifiedFiles...); err != nil { + return nil, "", nil, errors.Wrap(err, "failed to apply modified files") + } + if result.InvalidateDependencies { return nil, "", nil, errors.WithStack(errInvalidateDependencies) } diff --git a/internal/buildengine/deploy.go b/internal/buildengine/deploy.go index b9ffd0b813..539a3f2644 100644 --- a/internal/buildengine/deploy.go +++ b/internal/buildengine/deploy.go @@ -15,13 +15,9 @@ import ( errors "github.com/alecthomas/errors" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/result" - "github.com/puzpuzpuz/xsync/v3" "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/types/known/timestamppb" adminpb "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1" - buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" - langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1" ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" "github.com/block/ftl/common/key" @@ -44,10 +40,6 @@ type AdminClient interface { Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) } -type DependencyGrapher interface { - Graph(moduleNames ...string) (map[string][]string, error) -} - type pendingModule struct { module Module @@ -87,7 +79,7 @@ type pendingDeploy struct { type SchemaUpdatedEvent struct { schema *schema.Schema // marks which modules were changed together (ie. in the same changeset or queued together) - updatedModules map[string]bool + updatedModules []schema.ModuleRefKey } // DeployCoordinator manages the deployment of modules through changesets. It ensures that changesets are deployed @@ -99,12 +91,11 @@ type SchemaUpdatedEvent struct { // but publish it as part of the its schema. This allows the build engine to react and build module A against the new schema for module B. // The DeployCoordinator will then create a changeset of A and B together. type DeployCoordinator struct { - adminClient AdminClient - schemaSource *schemaeventsource.EventSource - dependencyGrapher DependencyGrapher + adminClient AdminClient + schemaSource *schemaeventsource.EventSource // for publishing deploy events - engineUpdates chan *buildenginepb.EngineEvent + deployUpdates chan internalEvent // deployment queue and state tracking deploymentQueue chan pendingDeploy @@ -121,59 +112,42 @@ func NewDeployCoordinator( ctx context.Context, adminClient AdminClient, schemaSource *schemaeventsource.EventSource, - dependencyGrapher DependencyGrapher, - engineUpdates chan *buildenginepb.EngineEvent, + deployUpdates chan internalEvent, logChanges bool, projectConfig projectconfig.Config, - externalRealms *xsync.MapOf[string, *schema.Realm], + externalRealms []*schema.Realm, ) *DeployCoordinator { c := &DeployCoordinator{ - adminClient: adminClient, - schemaSource: schemaSource, - dependencyGrapher: dependencyGrapher, - engineUpdates: engineUpdates, - deploymentQueue: make(chan pendingDeploy, 128), - SchemaUpdates: make(chan SchemaUpdatedEvent, 128), - logChanges: logChanges, - projectConfig: projectConfig, + adminClient: adminClient, + schemaSource: schemaSource, + deployUpdates: deployUpdates, + deploymentQueue: make(chan pendingDeploy, 128), + SchemaUpdates: make(chan SchemaUpdatedEvent, 128), + logChanges: logChanges, + projectConfig: projectConfig, + externalRealms: externalRealms, } - externalRealms.Range(func(key string, value *schema.Realm) bool { - c.externalRealms = append(c.externalRealms, value) - return true - }) - // Start the deployment queue processor go c.processEvents(ctx) return c } -func (c *DeployCoordinator) deploy(ctx context.Context, modules []*pendingModule, replicas optional.Option[int32]) error { +func (c *DeployCoordinator) deploy(ctx context.Context, modules map[string]*pendingModule, replicas optional.Option[int32]) error { logger := log.FromContext(ctx) - for _, module := range modules { - c.engineUpdates <- &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeployWaiting{ - ModuleDeployWaiting: &buildenginepb.ModuleDeployWaiting{ - Module: module.module.Config.Module, - }, - }, - } - } - pendingModules := make(map[string]*pendingModule, len(modules)) - for _, m := range modules { - pendingModules[m.moduleName()] = m - defer func() { - if err := os.RemoveAll(m.tmpDeployDir); err != nil { - logger.Errorf(err, "failed to remove tmp deploy dir %s", m.tmpDeployDir) + defer func() { + for _, module := range modules { + if err := os.RemoveAll(module.tmpDeployDir); err != nil { + logger.Errorf(err, "failed to remove tmp deploy dir %s", module.tmpDeployDir) } - }() - } + } + }() errChan := make(chan error, 1) c.deploymentQueue <- pendingDeploy{ - modules: pendingModules, + modules: modules, replicas: replicas, err: errChan} select { @@ -181,7 +155,9 @@ func (c *DeployCoordinator) deploy(ctx context.Context, modules []*pendingModule return errors.WithStack(ctx.Err()) //nolint:wrapcheck case err := <-errChan: if err != nil { - logger.Errorf(err, "Failed to deploy %s", strings.Join(slices.Map(modules, func(m *pendingModule) string { return m.moduleName() }), ", ")) + for _, module := range modules { + logger.Errorf(err, "Failed to deploy %s", module) + } } return errors.WithStack(err) } @@ -262,7 +238,7 @@ func (c *DeployCoordinator) processEvents(ctx context.Context) { // Check for modules that need to be rebuilt for this change to be valid // Try and deploy, unless there are conflicting changesets this will happen immediately - graph, err := c.dependencyGrapher.Graph() + graph, err := c.dependencyGraphForDeploymentState(toDeploy, deploying, deployment) if err != nil { log.FromContext(ctx).Errorf(err, "could not build graph to order deployment") continue @@ -280,9 +256,10 @@ func (c *DeployCoordinator) processEvents(ctx context.Context) { modulesToValidate = append(modulesToValidate, module) } deployment.waitingForModules = c.invalidModulesForDeployment(c.schemaSource.CanonicalView(), deployment, modulesToValidate) - if len(deployment.waitingForModules) > 0 { - deployment.publishInSchema = true - } + // For now let's always publish queued deployments in the schema + // This helps speed up the engine as dependencies can start building + // Originally this was only turned on if (deployment.waitingForModules > 0) + deployment.publishInSchema = true if c.tryDeployFromQueue(ctx, deployment, toDeploy, graph) { if deployment.changeset.Ok() { @@ -293,38 +270,48 @@ func (c *DeployCoordinator) processEvents(ctx context.Context) { toDeploy = append(toDeploy, deployment) } if deployment.publishInSchema { - c.publishUpdatedSchema(ctx, stdslices.Collect(maps.Keys(deployment.modules)), toDeploy, deploying) + c.publishUpdatedSchema(ctx, slices.Map(stdslices.Collect(maps.Keys(deployment.modules)), func(name string) schema.ModuleRefKey { + return schema.ModuleRefKey{Realm: c.projectConfig.Name, Module: name} + }), toDeploy, deploying) } case notification := <-events: var key key.Changeset - var updatedModules []string + var updatedModules []schema.ModuleRefKey switch e := notification.(type) { case *schema.ChangesetCommittedNotification: key = e.Changeset.Key - updatedModules = slices.Map(e.Changeset.InternalRealm().Modules, func(m *schema.Module) string { return m.Name }) - - for _, m := range e.Changeset.InternalRealm().RemovingModules { - if _, ok := slices.Find(updatedModules, func(s string) bool { return s == m.Name }); ok { - continue - } - c.engineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleRemoved{ - ModuleRemoved: &buildenginepb.ModuleRemoved{ - Module: m.Name, - }, - }, - } - } + // TODO: use e.Changeset.RealmChanges so external modules are handled + updatedModules = slices.Map(e.Changeset.InternalRealm().Modules, func(m *schema.Module) schema.ModuleRefKey { + return schema.ModuleRefKey{Realm: e.Changeset.InternalRealm().Name, Module: m.Name} + }) + + // TODO: bring this back + // for _, m := range e.Changeset.InternalRealm().RemovingModules { + // if _, ok := slices.Find(updatedModules, func(r schema.ModuleRefKey) bool { return r.Module == m.Name }); ok { + // continue + // } + + // c.engineUpdates <- &buildenginepb.EngineEvent{ + // Timestamp: timestamppb.Now(), + // Event: &buildenginepb.EngineEvent_ModuleRemoved{ + // ModuleRemoved: &buildenginepb.ModuleRemoved{ + // Module: m.Name, + // }, + // }, + // } + // } case *schema.ChangesetRollingBackNotification: key = e.Changeset.Key - updatedModules = slices.Map(e.Changeset.InternalRealm().Modules, func(m *schema.Module) string { return m.Name }) + // TODO: use e.Changeset.RealmChanges so external modules are handled + updatedModules = slices.Map(e.Changeset.InternalRealm().Modules, func(m *schema.Module) schema.ModuleRefKey { + return schema.ModuleRefKey{Realm: e.Changeset.InternalRealm().Name, Module: m.Name} + }) default: continue } tmp := []*pendingDeploy{} - graph, err := c.dependencyGrapher.Graph() + graph, err := c.dependencyGraphForDeploymentState(toDeploy, deploying) if err != nil { log.FromContext(ctx).Errorf(err, "could not build graph to order deployment") continue @@ -335,7 +322,7 @@ func (c *DeployCoordinator) processEvents(ctx context.Context) { } if d.publishInSchema { // already in published schema - updatedModules = []string{} + updatedModules = []schema.ModuleRefKey{} } return false }) @@ -354,6 +341,23 @@ func (c *DeployCoordinator) processEvents(ctx context.Context) { } } +func (c *DeployCoordinator) dependencyGraphForDeploymentState(toDeploy []*pendingDeploy, deploying []*pendingDeploy, extra ...*pendingDeploy) (map[string][]string, error) { + allDeploys := append([]*pendingDeploy{}, toDeploy...) + allDeploys = append(allDeploys, deploying...) + allDeploys = append(allDeploys, extra...) + customDependencyProviders := map[string]customDependencyProvider{ + "builtin": func() []string { return []string{} }, + } + for _, deployment := range allDeploys { + for _, module := range deployment.modules { + customDependencyProviders[module.moduleName()] = func() []string { + return module.module.Dependencies(AlwaysIncludeBuiltin) + } + } + } + return Graph(customDependencyProviders, c.schemaSource.CanonicalView(), stdslices.Collect(maps.Keys(customDependencyProviders))...) +} + func (c *DeployCoordinator) tryDeployFromQueue(ctx context.Context, deployment *pendingDeploy, toDeploy []*pendingDeploy, depGraph map[string][]string) bool { logger := log.FromContext(ctx) if len(deployment.waitingForModules) > 0 { @@ -391,14 +395,8 @@ func (c *DeployCoordinator) tryDeployFromQueue(ctx context.Context, deployment * // No conflicts, lets deploy moduleNames := slices.Sort(slices.Map(stdslices.Collect(maps.Values(deployment.modules)), func(m *pendingModule) string { return m.moduleName() })) logger.Debugf("Deploying %s", strings.Join(moduleNames, ",")) + c.deployUpdates <- moduleDeployStartedEvent{modules: slices.Map(stdslices.Collect(maps.Values(deployment.modules)), func(m *pendingModule) string { return m.moduleName() })} for _, module := range deployment.modules { - c.engineUpdates <- &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeployStarted{ - ModuleDeployStarted: &buildenginepb.ModuleDeployStarted{ - Module: module.moduleName(), - }, - }, - } if repo, ok := deployment.replicas.Get(); ok { module.schema.ModRuntime().ModScaling().MinReplicas = repo } @@ -407,32 +405,7 @@ func (c *DeployCoordinator) tryDeployFromQueue(ctx context.Context, deployment * keyChan := make(chan result.Result[key.Changeset], 1) go func() { err := deploy(ctx, c.projectConfig.Name, slices.Map(stdslices.Collect(maps.Values(deployment.modules)), func(m *pendingModule) *schema.Module { return m.schema }), c.adminClient, keyChan, c.externalRealms) - if err != nil { - // Handle deployment failure - for _, module := range deployment.modules { - c.engineUpdates <- &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeployFailed{ - ModuleDeployFailed: &buildenginepb.ModuleDeployFailed{ - Module: module.moduleName(), - Errors: &langpb.ErrorList{ - Errors: errorToLangError(err), - }, - }, - }, - } - } - } else { - // Handle deployment success - for _, module := range deployment.modules { - c.engineUpdates <- &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeploySuccess{ - ModuleDeploySuccess: &buildenginepb.ModuleDeploySuccess{ - Module: module.moduleName(), - }, - }, - } - } - } + c.deployUpdates <- moduleDeployEndedEvent{modules: slices.Map(stdslices.Collect(maps.Values(deployment.modules)), func(module *pendingModule) string { return module.moduleName() }), err: err} deployment.err <- err for _, sup := range deployment.supercededModules { sup.err <- err @@ -548,7 +521,7 @@ func (c *DeployCoordinator) invalidModulesForDeployment(originalSch *schema.Sche return out } -func (c *DeployCoordinator) publishUpdatedSchema(ctx context.Context, updatedModules []string, toDeploy, deploying []*pendingDeploy) { +func (c *DeployCoordinator) publishUpdatedSchema(ctx context.Context, updatedModules []schema.ModuleRefKey, toDeploy, deploying []*pendingDeploy) { logger := log.FromContext(ctx) overridden := map[string]bool{} toRemove := map[string]bool{} @@ -611,13 +584,9 @@ func (c *DeployCoordinator) publishUpdatedSchema(ctx context.Context, updatedMod logger.Errorf(err, "Deploy coordinator could not publish invalid schema") return } - updated := map[string]bool{} - for _, m := range updatedModules { - updated[m] = true - } c.SchemaUpdates <- SchemaUpdatedEvent{ schema: sch, - updatedModules: updated, + updatedModules: updatedModules, } } diff --git a/internal/buildengine/deps.go b/internal/buildengine/deps.go new file mode 100644 index 0000000000..74cee3884b --- /dev/null +++ b/internal/buildengine/deps.go @@ -0,0 +1,71 @@ +package buildengine + +import ( + "golang.org/x/exp/maps" + + errors "github.com/alecthomas/errors" + "github.com/block/ftl/common/schema" + "github.com/block/ftl/common/slices" + imaps "github.com/block/ftl/internal/maps" +) + +type customDependencyProvider func() []string + +func GraphFromStates(states map[string]*moduleState, sch *schema.Schema, moduleNames ...string) (map[string][]string, error) { + return Graph(imaps.MapValues(states, func(name string, state *moduleState) customDependencyProvider { + return func() []string { return state.module.Dependencies(AlwaysIncludeBuiltin) } + }), sch, moduleNames...) +} + +// Graph returns the dependency graph for the given modules. +// +// If no modules are provided, the entire graph is returned. An error is returned if +// any dependencies are missing. +func Graph(customProviders map[string]customDependencyProvider, sch *schema.Schema, moduleNames ...string) (map[string][]string, error) { + out := map[string][]string{} + if len(moduleNames) == 0 { + moduleNames = maps.Keys(customProviders) + moduleNames = append(moduleNames, slices.Map(sch.InternalModules(), func(m *schema.Module) string { return m.Name })...) + } + for _, name := range moduleNames { + if err := buildGraph(customProviders, sch, name, out); err != nil { + return nil, errors.WithStack(err) + } + } + return out, nil +} + +func buildGraph(customProviders map[string]customDependencyProvider, sch *schema.Schema, moduleName string, out map[string][]string) error { + if moduleName == "builtin" { + out["builtin"] = []string{} + return nil + } + var deps []string + // Short-circuit previously explored nodes + if _, ok := out[moduleName]; ok { + return nil + } + foundModule := false + if customProvider, ok := customProviders[moduleName]; ok { + foundModule = true + deps = customProvider() + } + if !foundModule { + if sch, ok := slices.Find(sch.InternalModules(), func(m *schema.Module) bool { return m.Name == moduleName }); ok { + foundModule = true + deps = append(deps, sch.Imports()...) + } + } + if !foundModule { + return errors.Errorf("module %q not found. does the module exist and is the ftl.toml file correct?", moduleName) + } + deps = slices.Unique(deps) + out[moduleName] = deps + for i := range deps { + dep := deps[i] + if err := buildGraph(customProviders, sch, dep, out); err != nil { + return errors.Wrapf(err, "module %q requires dependency %q", moduleName, dep) + } + } + return nil +} diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index 29e8c018f9..7026f85ef4 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -4,33 +4,25 @@ import ( "bytes" "context" "crypto/sha256" - "fmt" "runtime" - "sort" - "strings" - "sync" "time" "connectrpc.com/connect" - "github.com/alecthomas/atomic" errors "github.com/alecthomas/errors" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" - "github.com/puzpuzpuz/xsync/v3" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/types/known/timestamppb" adminpb "github.com/block/ftl/backend/protos/xyz/block/ftl/admin/v1" buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" - langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" "github.com/block/ftl/common/log" "github.com/block/ftl/common/reflect" "github.com/block/ftl/common/schema" "github.com/block/ftl/common/slices" "github.com/block/ftl/internal/buildengine/languageplugin" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/dev" - imaps "github.com/block/ftl/internal/maps" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" "github.com/block/ftl/internal/realm" @@ -39,80 +31,80 @@ import ( "github.com/block/ftl/internal/watch" ) -var _ rpc.Service = (*Engine)(nil) +// moduleWatcherFunc is a function that watches a module for changes +// +// It returns a way to create file transactions and a cancel function to stop watching the module +type moduleWatcherFunc func(ctx context.Context, config moduleconfig.ModuleConfig, internalEvents chan internalEvent) (transactionProviderFunc, context.CancelCauseFunc, error) -// moduleMeta is a wrapper around a module that includes the last build's start time. -type moduleMeta struct { - module Module - plugin *languageplugin.LanguagePlugin - events chan languageplugin.PluginEvent - configDefaults moduleconfig.CustomDefaults +// deployFunc is a function that might deploy a module. +// +// It returns true if the module is queued for deployment, or false otherwise. +type deployFunc func(ctx context.Context, module *pendingModule) (willDeploy bool) + +//sumtype:decl +type internalEvent interface { + internalEvent() } -// copyMetaWithUpdatedDependencies finds the dependencies for a module and returns a -// copy with those dependencies populated. -func copyMetaWithUpdatedDependencies(ctx context.Context, m moduleMeta) (moduleMeta, error) { - logger := log.FromContext(ctx) - logger.Debugf("Extracting dependencies for %q", m.module.Config.Module) +type addMetasEvent struct { + preparedModules map[string]preparedModule +} - dependencies, err := m.plugin.GetDependencies(ctx, m.module.Config) - if err != nil { - return moduleMeta{}, errors.Wrapf(err, "could not get dependencies for %v", m.module.Config.Module) - } +func (addMetasEvent) internalEvent() {} - m.module = m.module.CopyWithDependencies(dependencies) - return m, nil +type removeMetaEvent struct { + config moduleconfig.UnvalidatedModuleConfig } -//sumtype:decl -type rebuildEvent interface { - rebuildEvent() +func (removeMetaEvent) internalEvent() {} + +type schemaUpdateEvent struct { + newSchema *schema.Schema + modulesWithInterfaceChanges []schema.ModuleRefKey + modulesWithBreakingChanges []schema.ModuleRefKey } -// rebuildRequestEvent is published when a module needs to be rebuilt when a module -// failed to build due to a change in dependencies. -type rebuildRequestEvent struct { +func (schemaUpdateEvent) internalEvent() {} + +type moduleNeedsToBuildEvent struct { module string } -func (rebuildRequestEvent) rebuildEvent() {} +func (moduleNeedsToBuildEvent) internalEvent() {} -// rebuildRequiredEvent is published when a module needs to be rebuilt when a module -// failed to build due to a change in dependencies. -type autoRebuildCompletedEvent struct { - module string - schema *schema.Module +type moduleBuildEndedEvent struct { + config moduleconfig.ModuleConfig + moduleSchema *schema.Module tmpDeployDir string deployPaths []string + err error } -func (autoRebuildCompletedEvent) rebuildEvent() {} +func (moduleBuildEndedEvent) internalEvent() {} + +type moduleDeployStartedEvent struct { + modules []string +} + +func (moduleDeployStartedEvent) internalEvent() {} + +type moduleDeployEndedEvent struct { + modules []string + err error +} + +func (moduleDeployEndedEvent) internalEvent() {} // Engine for building a set of modules. type Engine struct { - adminClient AdminClient - deployCoordinator *DeployCoordinator - moduleMetas *xsync.MapOf[string, moduleMeta] - externalRealms *xsync.MapOf[string, *schema.Realm] - projectConfig projectconfig.Config - moduleDirs []string - watcher *watch.Watcher // only watches for module toml changes - targetSchema atomic.Value[*schema.Schema] - cancel context.CancelCauseFunc - parallelism int - modulesToBuild *xsync.MapOf[string, bool] - buildEnv []string - startTime optional.Option[time.Time] - - // events coming in from plugins - pluginEvents chan languageplugin.PluginEvent - - // requests to rebuild modules due to dependencies changing or plugins dying - rebuildEvents chan rebuildEvent - - // internal channel for raw engine updates (does not include all state changes) - rawEngineUpdates chan *buildenginepb.EngineEvent - + adminClient AdminClient + projectConfig projectconfig.Config + moduleDirs []string + parallelism int + buildEnv []string + startTime optional.Option[time.Time] + + internalEvents chan internalEvent // topic to subscribe to engine events engineUpdates *pubsub.Topic[*buildenginepb.EngineEvent] @@ -124,6 +116,8 @@ type Engine struct { updatesService rpc.Service } +var _ rpc.Service = (*Engine)(nil) + func (e *Engine) StartServices(ctx context.Context) ([]rpc.Option, error) { services, err := e.updatesService.StartServices(ctx) if err != nil { @@ -171,92 +165,32 @@ func WithStartTime(startTime time.Time) Option { func New( ctx context.Context, adminClient AdminClient, - schemaSource *schemaeventsource.EventSource, projectConfig projectconfig.Config, moduleDirs []string, logChanges bool, options ...Option, ) (*Engine, error) { ctx = log.ContextWithLogger(ctx, log.FromContext(ctx).Scope("build-engine")) - rawEngineUpdates := make(chan *buildenginepb.EngineEvent, 128) e := &Engine{ - adminClient: adminClient, - projectConfig: projectConfig, - moduleDirs: moduleDirs, - moduleMetas: xsync.NewMapOf[string, moduleMeta](), - watcher: watch.NewWatcher(optional.Some(projectConfig.WatchModulesLockPath()), "ftl.toml", "**/*.sql"), - pluginEvents: make(chan languageplugin.PluginEvent, 128), - parallelism: runtime.NumCPU(), - modulesToBuild: xsync.NewMapOf[string, bool](), - rebuildEvents: make(chan rebuildEvent, 128), - rawEngineUpdates: rawEngineUpdates, - engineUpdates: pubsub.New[*buildenginepb.EngineEvent](), - arch: runtime.GOARCH, // Default to the local env, we attempt to read these from the cluster later - os: runtime.GOOS, - externalRealms: xsync.NewMapOf[string, *schema.Realm](), - } - - for name, cfg := range projectConfig.ExternalRealms { - realm, err := realm.GetExternalRealm(ctx, e.projectConfig.ExternalRealmPath(), name, cfg) - if err != nil { - return nil, errors.Wrapf(err, "failed to read external realm %s", name) - } - e.externalRealms.Store(realm.Name, realm) + adminClient: adminClient, + projectConfig: projectConfig, + moduleDirs: moduleDirs, + parallelism: runtime.NumCPU(), + engineUpdates: pubsub.New[*buildenginepb.EngineEvent](), + arch: runtime.GOARCH, // Default to the local env, we attempt to read these from the cluster later + os: runtime.GOOS, + internalEvents: make(chan internalEvent, 64), } - e.deployCoordinator = NewDeployCoordinator(ctx, adminClient, schemaSource, e, rawEngineUpdates, logChanges, projectConfig, e.externalRealms) for _, option := range options { option(e) } - ctx, cancel := context.WithCancelCause(ctx) - e.cancel = cancel - - configs, err := watch.DiscoverModules(ctx, moduleDirs) - if err != nil { - return nil, errors.Wrap(err, "could not find modules") - } - - err = CleanStubs(ctx, projectConfig.Root(), configs) - if err != nil { - return nil, errors.Wrap(err, "failed to clean stubs") - } - updateTerminalWithEngineEvents(ctx, e.engineUpdates) - go e.watchForPluginEvents(ctx) e.updatesService = e.startUpdatesService(ctx) - go e.watchForEventsToPublish(ctx, len(configs) > 0) - - wg := &errgroup.Group{} - for _, config := range configs { - wg.Go(func() error { - meta, err := e.newModuleMeta(ctx, config) - if err != nil { - return errors.WithStack(err) - } - meta, err = copyMetaWithUpdatedDependencies(ctx, meta) - if err != nil { - return errors.WithStack(err) - } - e.moduleMetas.Store(config.Module, meta) - e.modulesToBuild.Store(config.Module, true) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleAdded{ - ModuleAdded: &buildenginepb.ModuleAdded{ - Module: config.Module, - }, - }, - } - return nil - }) - } - if err := wg.Wait(); err != nil { - return nil, errors.WithStack(err) //nolint:wrapcheck - } if adminClient != nil { info, err := adminClient.ClusterInfo(ctx, connect.NewRequest(&adminpb.ClusterInfoRequest{})) if err != nil { @@ -266,595 +200,278 @@ func New( e.arch = info.Msg.Arch } } - // Save initial schema - initialEvent := <-e.deployCoordinator.SchemaUpdates - e.targetSchema.Store(initialEvent.schema) - if adminClient == nil { - return e, nil - } return e, nil } -// Close stops the Engine's schema sync. -func (e *Engine) Close() error { - e.cancel(errors.Wrap(context.Canceled, "build engine stopped")) - return nil -} - -func (e *Engine) GetSchema() (*schema.Schema, bool) { - sch := e.targetSchema.Load() - if sch == nil { - return nil, false +// Dev builds and deploys all local modules and watches for changes, redeploying as necessary. +func (e *Engine) Dev(ctx context.Context, period time.Duration, schemaSource *schemaeventsource.EventSource) error { + externalRealms := []*schema.Realm{} + for name, cfg := range e.projectConfig.ExternalRealms { + realm, err := realm.GetExternalRealm(ctx, e.projectConfig.ExternalRealmPath(), name, cfg) + if err != nil { + return errors.Wrapf(err, "failed to read external realm %s", name) + } + externalRealms = append(externalRealms, realm) } - return sch, true -} + // TODO: logchanges param? + deployCoordinator := NewDeployCoordinator(ctx, e.adminClient, schemaSource, e.internalEvents, false, e.projectConfig, externalRealms) + // Save initial schema + initialSchemaEvent := <-deployCoordinator.SchemaUpdates -func (e *Engine) GetModuleSchema(moduleName string) (*schema.Module, bool) { - sch := e.targetSchema.Load() - if sch == nil { - return nil, false - } - module, ok := slices.Find(sch.InternalModules(), func(m *schema.Module) bool { - return m.Name == moduleName + go watchForNewOrRemovedModules(ctx, e.projectConfig, e.moduleDirs, period, e.internalEvents) + go watchSchemaUpdates(ctx, initialSchemaEvent.schema, deployCoordinator.SchemaUpdates, e.internalEvents) + + // watch for module additions and revovals + _, err := e.processEvents(ctx, initialSchemaEvent.schema, false, moduleWatcherWithPeriod(period), buildModuleAndPublish, func(ctx context.Context, module *pendingModule) bool { + go deployCoordinator.deploy(ctx, map[string]*pendingModule{module.schema.Name: module}, optional.None[int32]()) + return true }) - if !ok { - return nil, false - } - return module, true + return errors.WithStack(err) } -// Graph returns the dependency graph for the given modules. -// -// If no modules are provided, the entire graph is returned. An error is returned if -// any dependencies are missing. -func (e *Engine) Graph(moduleNames ...string) (map[string][]string, error) { - out := map[string][]string{} - if len(moduleNames) == 0 { - e.moduleMetas.Range(func(name string, _ moduleMeta) bool { - moduleNames = append(moduleNames, name) - return true - }) - } - for _, name := range moduleNames { - if err := e.buildGraph(name, out); err != nil { - return nil, errors.WithStack(err) - } - } - return out, nil +func (e *Engine) Build(ctx context.Context, schemaSource *schemaeventsource.EventSource) error { + _, _, err := e.buildAndCollect(ctx, schemaSource) + return err } -func (e *Engine) buildGraph(moduleName string, out map[string][]string) error { - var deps []string - // Short-circuit previously explored nodes - if _, ok := out[moduleName]; ok { - return nil - } - foundModule := false - if meta, ok := e.moduleMetas.Load(moduleName); ok { - foundModule = true - deps = meta.module.Dependencies(AlwaysIncludeBuiltin) +func (e *Engine) BuildAndDeploy(ctx context.Context, schemaSource *schemaeventsource.EventSource, replicas optional.Option[int32]) error { + moduleStates, pendingModules, err := e.buildAndCollect(ctx, schemaSource) + // TODO: need a way to get module states... + if err != nil { + return err } - if !foundModule { - if sch, ok := e.GetModuleSchema(moduleName); ok { - foundModule = true - deps = append(deps, sch.Imports()...) - } + for _, module := range pendingModules { + e.engineUpdates.Publish(newModuleDeployStartedEvent(module.schema.Name)) } - if !foundModule { - return errors.Errorf("module %q not found. does the module exist and is the ftl.toml file correct?", moduleName) + deployCoordinator := NewDeployCoordinator(ctx, e.adminClient, schemaSource, e.internalEvents, true, e.projectConfig, nil) + if err := deployCoordinator.deploy(ctx, pendingModules, replicas); err != nil { + // TODO: handle } - deps = slices.Unique(deps) - out[moduleName] = deps - for i := range deps { - dep := deps[i] - if err := e.buildGraph(dep, out); err != nil { - return errors.Wrapf(err, "module %q requires dependency %q", moduleName, dep) - } + for _, module := range pendingModules { + e.engineUpdates.Publish(newModuleDeploySuccessEvent(module.schema.Name)) } + // TODO: pass in module states + e.engineUpdates.Publish(newEngineEndedEvent(moduleStates)) return nil } -// Import manually imports a schema for a module as if it were retrieved from -// the FTL controller. -func (e *Engine) Import(ctx context.Context, realmName string, moduleSch *schema.Module) { - sch := reflect.DeepCopy(e.targetSchema.Load()) - for _, realm := range sch.Realms { - if realm.Name != realmName { - continue - } - realm.Modules = slices.Filter(realm.Modules, func(m *schema.Module) bool { - return m.Name != moduleSch.Name +// buildAndCollect builds all modules and returns the module states and pending modules for deployment. +func (e *Engine) buildAndCollect(ctx context.Context, schemaSource *schemaeventsource.EventSource) (map[string]*moduleState, map[string]*pendingModule, error) { + sch := schemaSource.CanonicalView() + if _, ok := sch.FirstInternalRealm().Get(); !ok { + sch.Realms = append(sch.Realms, &schema.Realm{ + Name: e.projectConfig.Name, }) - realm.Modules = append(realm.Modules, moduleSch) - break } - e.targetSchema.Store(sch) -} + sch = sch.WithBuiltins() -// Build attempts to build all local modules. -func (e *Engine) Build(ctx context.Context) error { - return e.BuildWithCallback(ctx, nil) -} - -// BuildWithCallback attempts to build all local modules, and calls back with the result -func (e *Engine) BuildWithCallback(ctx context.Context, callback func(ctx context.Context, module Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error) error { - schemas := make(chan *schema.Module, e.moduleMetas.Size()) - if err := e.buildWithCallback(ctx, func(ctx context.Context, module Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error { - schemas <- moduleSch - if callback != nil { - err := callback(ctx, module, moduleSch, tmpDeployDir, deployPaths) - if err != nil { - return errors.Wrapf(err, "build callback failed") - } - } - return nil - }); err != nil { - return errors.WithStack(err) - } - close(schemas) + schemaUpdates := make(chan SchemaUpdatedEvent, 32) - realm := &schema.Realm{ - Name: e.projectConfig.Name, + configs, err := watch.DiscoverModules(ctx, e.moduleDirs) + if err != nil { + return nil, nil, errors.Wrap(err, "could not find modules") } - for moduleSch := range schemas { - realm.Modules = append(realm.Modules, moduleSch) + preparedModuleMap := newPreparedModulesForConfigs(ctx, configs, e.projectConfig) + if len(preparedModuleMap) > 0 { + e.internalEvents <- addMetasEvent{preparedModules: preparedModuleMap} } - e.targetSchema.Store(&schema.Schema{ - Realms: []*schema.Realm{realm}, - }) - return nil -} -// Each iterates over all local modules. -func (e *Engine) Each(fn func(Module) error) (err error) { - e.moduleMetas.Range(func(key string, value moduleMeta) bool { - if ferr := fn(value.module); ferr != nil { - err = errors.Wrapf(ferr, "%s", key) - return false - } - return true - }) - err = errors.WithStack(err) - return -} + go watchSchemaUpdates(ctx, reflect.DeepCopy(sch), schemaUpdates, e.internalEvents) -// Modules returns the names of all modules. -func (e *Engine) Modules() []string { - var moduleNames []string - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - moduleNames = append(moduleNames, name) - return true + pendingModules := map[string]*pendingModule{} + // watch for module additions and revovals + moduleStates, err := e.processEvents(ctx, reflect.DeepCopy(sch), true, nil, buildModuleAndPublish, func(ctx context.Context, module *pendingModule) bool { + realm := sch.FirstInternalRealm().MustGet() + realm.Modules = slices.Filter(realm.Modules, func(m *schema.Module) bool { + return module.moduleName() != m.Name + }) + realm.Modules = append(realm.Modules, module.schema) + schemaUpdates <- SchemaUpdatedEvent{ + schema: reflect.DeepCopy(sch), + updatedModules: []schema.ModuleRefKey{ + {Realm: realm.Name, Module: module.moduleName()}, + }, + } + pendingModules[module.moduleName()] = module + return false }) - return moduleNames -} - -// Dev builds and deploys all local modules and watches for changes, redeploying as necessary. -func (e *Engine) Dev(ctx context.Context, period time.Duration) error { - return errors.WithStack(e.watchForModuleChanges(ctx, period)) + if err != nil { + return nil, nil, errors.WithStack(err) + } + return moduleStates, pendingModules, nil } -// watchForModuleChanges watches for changes and all build start and event state changes. -func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration) error { +// watchForNewOrRemovedModules watches for new or removed modules in the project directory. +// +// It uses a watcher to monitor the module directories and sends events to the internalEvents channel when modules are added or removed. +func watchForNewOrRemovedModules(ctx context.Context, projectConfig projectconfig.Config, moduleDirs []string, period time.Duration, internalEvents chan internalEvent) { logger := log.FromContext(ctx) - - watchEvents := make(chan watch.WatchEvent, 128) - topic, err := e.watcher.Watch(ctx, period, e.moduleDirs) + watcher := watch.NewWatcher(optional.Some(projectConfig.WatchModulesLockPath())) + moduleListChanges := make(chan watch.WatchEvent, 16) + moduleListTopic, err := watcher.Watch(ctx, period, moduleDirs) if err != nil { - return errors.Wrap(err, "failed to start watcher") + logger.Errorf(err, "failed to start watcher for module directories %v", moduleDirs) + return } - topic.Subscribe(watchEvents) + moduleListTopic.Subscribe(moduleListChanges) - // Build and deploy all modules first. - if err := e.BuildAndDeploy(ctx, optional.None[int32](), true, false); err != nil { - logger.Errorf(err, "Initial build and deploy failed") - } + for event := range channels.IterContext(ctx, moduleListChanges) { + switch event := event.(type) { + case watch.WatchEventModulesAdded: + preparedModuleMap := newPreparedModulesForConfigs(ctx, event.Configs, projectConfig) + if len(preparedModuleMap) > 0 { + internalEvents <- addMetasEvent{preparedModules: preparedModuleMap} + } + case watch.WatchEventModuleRemoved: + internalEvents <- removeMetaEvent{config: event.Config} - // Update schema and set initial module hashes - for { - select { - case event := <-e.deployCoordinator.SchemaUpdates: - e.targetSchema.Store(event.schema) - continue - default: + case watch.WatchEventModuleChanged: + // Changes within a module are not handled here } - break } - moduleHashes := map[string][]byte{} - for _, sch := range e.targetSchema.Load().InternalModules() { - hash, err := computeModuleHash(sch) - if err != nil { - return errors.Wrapf(err, "compute hash for %s failed", sch.Name) - } - moduleHashes[sch.Name] = hash - } - - for { - select { - case <-ctx.Done(): - return errors.WithStack(ctx.Err()) - - case event, ok := <-watchEvents: - if !ok { - // Watcher stopped unexpectedly (channel closed). - logger.Debugf("Watch event channel closed, watcher likely stopped.") - if ctxErr := ctx.Err(); ctxErr != nil { - return errors.Wrap(ctxErr, "watcher stopped") - } - return errors.New("watcher stopped unexpectedly") - } - switch event := event.(type) { - case watch.WatchEventModuleAdded: - logger.Debugf("Module %q added", event.Config.Module) - config := event.Config - if _, exists := e.moduleMetas.Load(config.Module); !exists { - meta, err := e.newModuleMeta(ctx, config) - logger.Debugf("generated meta for %q", event.Config.Module) - if err != nil { - logger.Errorf(err, "could not add module %s", config.Module) - continue - } - e.moduleMetas.Store(config.Module, meta) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleAdded{ - ModuleAdded: &buildenginepb.ModuleAdded{ - Module: config.Module, - }, - }, - } - logger.Debugf("calling build and deploy %q", event.Config.Module) - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, config.Module); err != nil { - logger.Errorf(err, "Build and deploy failed for added module %s", config.Module) - } - } - case watch.WatchEventModuleRemoved: - err := e.deployCoordinator.terminateModuleDeployment(ctx, event.Config.Module) - if err != nil { - logger.Errorf(err, "terminate %s failed", event.Config.Module) - } - if meta, ok := e.moduleMetas.Load(event.Config.Module); ok { - meta.plugin.Updates().Unsubscribe(meta.events) - err := meta.plugin.Kill() - if err != nil { - logger.Errorf(err, "terminate %s plugin failed", event.Config.Module) - } - } - e.moduleMetas.Delete(event.Config.Module) - e.modulesToBuild.Delete(event.Config.Module) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleRemoved{ - ModuleRemoved: &buildenginepb.ModuleRemoved{ - Module: event.Config.Module, - }, - }, - } - case watch.WatchEventModuleChanged: - // ftl.toml file has changed - meta, ok := e.moduleMetas.Load(event.Config.Module) - if !ok { - logger.Warnf("Module %q not found", event.Config.Module) - continue - } - - updatedConfig, err := moduleconfig.LoadConfig(event.Config.Dir) - if err != nil { - logger.Errorf(err, "Could not load updated toml for %s", event.Config.Module) - continue - } - validConfig, err := updatedConfig.FillDefaultsAndValidate(meta.configDefaults, e.projectConfig) - if err != nil { - logger.Errorf(err, "Could not configure module config defaults for %s", event.Config.Module) - continue - } - meta.module.Config = validConfig - e.moduleMetas.Store(event.Config.Module, meta) - - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, event.Config.Module); err != nil { - logger.Errorf(err, "Build and deploy failed for updated module %s", event.Config.Module) - } - } - case event := <-e.deployCoordinator.SchemaUpdates: - e.targetSchema.Store(event.schema) - for _, module := range event.schema.InternalModules() { - if !event.updatedModules[module.Name] { - continue - } - existingHash, ok := moduleHashes[module.Name] - if !ok { - existingHash = []byte{} - } +} - hash, err := computeModuleHash(module) - if err != nil { - logger.Errorf(err, "compute hash for %s failed", module.Name) - continue - } +type preparedModule struct { + module Module + plugin *languageplugin.LanguagePlugin + configDefaults moduleconfig.CustomDefaults +} - if bytes.Equal(hash, existingHash) { - logger.Tracef("schema for %s has not changed", module.Name) - continue - } +func newPreparedModulesForConfigs(ctx context.Context, configs []moduleconfig.UnvalidatedModuleConfig, projectConfig projectconfig.Config) map[string]preparedModule { + logger := log.FromContext(ctx) + preparedModules := make(chan preparedModule, len(configs)) + group := errgroup.Group{} - moduleHashes[module.Name] = hash - - dependentModuleNames := e.getDependentModuleNames(module.Name) - dependentModuleNames = slices.Filter(dependentModuleNames, func(name string) bool { - // We don't update if this was already part of the same changeset - return !event.updatedModules[name] - }) - if len(dependentModuleNames) > 0 { - logger.Infof("%s's schema changed; processing %s", module.Name, strings.Join(dependentModuleNames, ", ")) //nolint:forbidigo - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, dependentModuleNames...); err != nil { - logger.Errorf(err, "Build and deploy failed for dependent modules of %s", module.Name) - } - } + for _, config := range configs { + // Creating a plugin takes a while, so we do this in parallel. + group.Go(func() error { + plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module) + if err != nil { + return errors.Wrapf(err, "could not create plugin for %s", config.Module) } - - case event := <-e.rebuildEvents: - events := []rebuildEvent{event} - readLoop: - for { - select { - case event := <-e.rebuildEvents: - events = append(events, event) - default: - break readLoop - } + // update config with defaults + customDefaults, err := languageplugin.GetModuleConfigDefaults(ctx, config.Language, config.Dir) + if err != nil { + return errors.Wrapf(err, "could not get defaults provider for %s", config.Module) } - // Batch generate stubs for all auto rebuilds - // - // This is normally part of each group in the build topology, but auto rebuilds do not go through that flow - builtModuleEvents := map[string]autoRebuildCompletedEvent{} - for _, event := range events { - event, ok := event.(autoRebuildCompletedEvent) - if !ok { - continue - } - builtModuleEvents[event.module] = event + validConfig, err := config.FillDefaultsAndValidate(customDefaults, projectConfig) + if err != nil { + return errors.Wrapf(err, "could not apply defaults for %s", config.Module) } - if len(builtModuleEvents) > 0 { - metasMap := map[string]moduleMeta{} - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - metasMap[name] = meta - return true - }) - builtSchemas := imaps.MapValues(builtModuleEvents, func(_ string, e autoRebuildCompletedEvent) *schema.Module { return e.schema }) - err = GenerateStubs(ctx, e.projectConfig.Root(), maps.Values(builtSchemas), metasMap) - if err != nil { - logger.Errorf(err, "Failed to generate stubs") - } - - // Sync references to stubs if needed by the runtime - err = e.syncNewStubReferences(ctx, builtSchemas, metasMap) - if err != nil { - logger.Errorf(err, "Failed to sync stub references") - } - - // Deploy modules - var modulesToDeploy = []*pendingModule{} - for _, event := range builtModuleEvents { - moduleToDeploy, ok := e.moduleMetas.Load(event.module) - if ok { - modulesToDeploy = append(modulesToDeploy, newPendingModule(moduleToDeploy.module, event.tmpDeployDir, event.deployPaths, event.schema)) - } - } - go func() { - _ = e.deployCoordinator.deploy(ctx, modulesToDeploy, optional.None[int32]()) //nolint:errcheck - }() + prep := preparedModule{ + module: newModule(validConfig), + plugin: plugin, + configDefaults: customDefaults, } + logger.Debugf("Extracting dependencies for %q", prep.module.Config.Module) - // Batch together all new builds requested - modulesToBuild := map[string]bool{} - for _, event := range events { - event, ok := event.(rebuildRequestEvent) - if !ok { - continue - } - modulesToBuild[event.module] = true - } - if len(modulesToBuild) > 0 { - if err := e.BuildAndDeploy(ctx, optional.None[int32](), false, false, maps.Keys(modulesToBuild)...); err != nil { - logger.Errorf(err, "Build and deploy failed for rebuild requested modules") - } + dependencies, err := prep.plugin.GetDependencies(ctx, prep.module.Config) + if err != nil { + return errors.Wrapf(err, "could not get dependencies for %v", prep.module.Config.Module) } + prep.module = prep.module.CopyWithDependencies(dependencies) + preparedModules <- prep + return nil + }) + } + if err := group.Wait(); err != nil { + logger.Errorf(err, "failed to create module metas for new modules") + } + preparedModuleMap := map[string]preparedModule{} +collectMetas: + for { + select { + case m := <-preparedModules: + preparedModuleMap[m.module.Config.Module] = m + default: + break collectMetas } } + return preparedModuleMap } -type moduleState int - -const ( - moduleStateBuildWaiting moduleState = iota - moduleStateExplicitlyBuilding - moduleStateAutoRebuilding - moduleStateBuilt - moduleStateDeployWaiting - moduleStateDeploying - moduleStateDeployed - moduleStateFailed -) +func moduleWatcherWithPeriod(period time.Duration) moduleWatcherFunc { + return func(ctx context.Context, config moduleconfig.ModuleConfig, internalEvents chan internalEvent) (transactionProviderFunc, context.CancelCauseFunc, error) { + patterns := config.Watch + patterns = append(patterns, "ftl.toml", "**/*.sql") + watcher := watch.NewWatcher(optional.None[string](), patterns...) -func (e *Engine) isIdle(moduleStates map[string]moduleState) bool { - if len(moduleStates) == 0 { - return true - } - for module, state := range moduleStates { - switch state { - case moduleStateExplicitlyBuilding, - moduleStateAutoRebuilding, - moduleStateDeploying: - return false + ctx, cancel := context.WithCancelCause(ctx) + updates, err := watcher.Watch(ctx, period, []string{config.Abs().Dir}) + if err != nil { + cancel(context.Canceled) + return nil, nil, errors.Wrapf(err, "failed to watch module directory %s", config.Dir) + } + events := make(chan watch.WatchEvent, 16) + updates.Subscribe(events) - case moduleStateFailed, - moduleStateDeployed, - moduleStateDeployWaiting, - moduleStateBuilt: - - case moduleStateBuildWaiting: - // If no deps have failed then this module is waiting to start building - deps := e.getDependentModuleNames(module) - failedDeps := slices.Filter(deps, func(dep string) bool { - if depState, ok := moduleStates[dep]; ok && depState == moduleStateFailed { - return true + go func() { + for event := range channels.IterContext(ctx, events) { + switch event.(type) { + case watch.WatchEventModulesAdded: + // not handled here + case watch.WatchEventModuleRemoved: + // not handled here + case watch.WatchEventModuleChanged: + internalEvents <- moduleNeedsToBuildEvent{module: config.Module} } - return false - }) - if len(failedDeps) == 0 { - return false } - } + }() + return func() watch.ModifyFilesTransaction { + return watcher.GetTransaction(config.Abs().Dir) + }, cancel, nil } - return true } -// watchForEventsToPublish listens for raw build events, collects state, and publishes public events to BuildUpdates topic. -func (e *Engine) watchForEventsToPublish(ctx context.Context, hasInitialModules bool) { +func watchSchemaUpdates(ctx context.Context, + initialSchema *schema.Schema, + schemaUpdates chan SchemaUpdatedEvent, + internalEvents chan internalEvent) { logger := log.FromContext(ctx) - - moduleErrors := map[string]*langpb.ErrorList{} - moduleStates := map[string]moduleState{} - - idle := true - var endTime time.Time - var becomeIdleTimer <-chan time.Time - - isFirstRound := hasInitialModules - - addTimestamp := func(evt *buildenginepb.EngineEvent) { - if evt.Timestamp == nil { - evt.Timestamp = timestamppb.Now() + moduleHashes := map[schema.ModuleRefKey][]byte{} + // TODO: do not just do internal modules + for _, realm := range initialSchema.Realms { + for _, module := range realm.Modules { + hash, err := computeModuleHash(module) + if err != nil { + logger.Errorf(err, "compute hash for %s failed", module.Name) + continue + } + moduleHashes[schema.ModuleRefKey{Realm: realm.Name, Module: module.Name}] = hash } } - for { - select { - case <-ctx.Done(): - return - - case <-becomeIdleTimer: - becomeIdleTimer = nil - if !e.isIdle(moduleStates) { + for event := range channels.IterContext(ctx, schemaUpdates) { + modulesWithInterfaceChanges := []schema.ModuleRefKey{} + materiallyChangedModules := []schema.ModuleRefKey{} + for _, moduleRef := range event.updatedModules { + moduleSch, ok := event.schema.Module(moduleRef.Realm, moduleRef.Module).Get() + if !ok { + logger.Logf(log.Error, "module %s not found in schema", moduleRef) continue } - idle = true - - if e.devMode && isFirstRound { - if len(moduleErrors) > 0 { - var errs []error - for module, errList := range moduleErrors { - if errList != nil && len(errList.Errors) > 0 { - moduleErr := errors.Errorf("%s: %s", module, langpb.ErrorListString(errList)) - errs = append(errs, moduleErr) - } - } - if len(errs) > 1 { - logger.Logf(log.Error, "Initial build failed:\n%s", strings.Join(slices.Map(errs, func(err error) string { - return fmt.Sprintf(" %s", err) - }), "\n")) - } else { - logger.Errorf(errors.Join(errs...), "Initial build failed") - } - } else if start, ok := e.startTime.Get(); ok { - e.startTime = optional.None[time.Time]() - logger.Infof("All modules deployed in %.2fs, watching for changes...", endTime.Sub(start).Seconds()) - } else { - logger.Infof("All modules deployed, watching for changes...") - } - } - isFirstRound = false - - modulesOutput := []*buildenginepb.EngineEnded_Module{} - for module := range moduleStates { - meta, ok := e.moduleMetas.Load(module) - if !ok { - continue - } - modulesOutput = append(modulesOutput, &buildenginepb.EngineEnded_Module{ - Module: module, - Path: meta.module.Config.Dir, - Errors: moduleErrors[module], - }) + hash, err := computeModuleHash(moduleSch) + if err != nil { + logger.Errorf(err, "compute hash for %s failed", moduleRef) + continue } - evt := &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_EngineEnded{ - EngineEnded: &buildenginepb.EngineEnded{ - Modules: modulesOutput, - }, - }, + existingHash, ok := moduleHashes[moduleRef] + if !ok { + existingHash = []byte{} } - addTimestamp(evt) - e.engineUpdates.Publish(evt) - - case evt := <-e.rawEngineUpdates: - switch rawEvent := evt.Event.(type) { - case *buildenginepb.EngineEvent_ModuleAdded: - - case *buildenginepb.EngineEvent_ModuleRemoved: - delete(moduleErrors, rawEvent.ModuleRemoved.Module) - delete(moduleStates, rawEvent.ModuleRemoved.Module) - - case *buildenginepb.EngineEvent_ModuleBuildWaiting: - moduleStates[rawEvent.ModuleBuildWaiting.Config.Name] = moduleStateBuildWaiting - - case *buildenginepb.EngineEvent_ModuleBuildStarted: - if idle { - idle = false - started := &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_EngineStarted{ - EngineStarted: &buildenginepb.EngineStarted{}, - }, - } - addTimestamp(started) - e.engineUpdates.Publish(started) - } - if rawEvent.ModuleBuildStarted.IsAutoRebuild { - moduleStates[rawEvent.ModuleBuildStarted.Config.Name] = moduleStateAutoRebuilding - } else { - moduleStates[rawEvent.ModuleBuildStarted.Config.Name] = moduleStateExplicitlyBuilding - } - delete(moduleErrors, rawEvent.ModuleBuildStarted.Config.Name) - logger.Module(rawEvent.ModuleBuildStarted.Config.Name).Scope("build").Debugf("Building...") - case *buildenginepb.EngineEvent_ModuleBuildFailed: - moduleStates[rawEvent.ModuleBuildFailed.Config.Name] = moduleStateFailed - moduleErrors[rawEvent.ModuleBuildFailed.Config.Name] = rawEvent.ModuleBuildFailed.Errors - moduleErr := errors.Errorf("%s", langpb.ErrorListString(rawEvent.ModuleBuildFailed.Errors)) - logger.Module(rawEvent.ModuleBuildFailed.Config.Name).Scope("build").Errorf(moduleErr, "Build failed") - case *buildenginepb.EngineEvent_ModuleBuildSuccess: - moduleStates[rawEvent.ModuleBuildSuccess.Config.Name] = moduleStateBuilt - delete(moduleErrors, rawEvent.ModuleBuildSuccess.Config.Name) - case *buildenginepb.EngineEvent_ModuleDeployWaiting: - moduleStates[rawEvent.ModuleDeployWaiting.Module] = moduleStateDeployWaiting - case *buildenginepb.EngineEvent_ModuleDeployStarted: - if idle { - idle = false - started := &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_EngineStarted{ - EngineStarted: &buildenginepb.EngineStarted{}, - }, - } - addTimestamp(started) - e.engineUpdates.Publish(started) - } - moduleStates[rawEvent.ModuleDeployStarted.Module] = moduleStateDeploying - delete(moduleErrors, rawEvent.ModuleDeployStarted.Module) - case *buildenginepb.EngineEvent_ModuleDeployFailed: - moduleStates[rawEvent.ModuleDeployFailed.Module] = moduleStateFailed - moduleErrors[rawEvent.ModuleDeployFailed.Module] = rawEvent.ModuleDeployFailed.Errors - case *buildenginepb.EngineEvent_ModuleDeploySuccess: - moduleStates[rawEvent.ModuleDeploySuccess.Module] = moduleStateDeployed - delete(moduleErrors, rawEvent.ModuleDeploySuccess.Module) + modulesWithInterfaceChanges = append(modulesWithInterfaceChanges, moduleRef) + if bytes.Equal(hash, existingHash) { + logger.Tracef("schema for %s has not changed", moduleRef) + continue } - addTimestamp(evt) - e.engineUpdates.Publish(evt) + moduleHashes[moduleRef] = hash + materiallyChangedModules = append(materiallyChangedModules, moduleRef) } - if !idle && e.isIdle(moduleStates) { - endTime = time.Now() - becomeIdleTimer = time.After(time.Millisecond * 200) + internalEvents <- schemaUpdateEvent{ + newSchema: reflect.DeepCopy(event.schema), + modulesWithInterfaceChanges: modulesWithInterfaceChanges, + modulesWithBreakingChanges: materiallyChangedModules, } } } @@ -868,652 +485,378 @@ func computeModuleHash(module *schema.Module) ([]byte, error) { return hasher.Sum(nil), nil } -func (e *Engine) getDependentModuleNames(moduleName string) []string { - dependentModuleNames := map[string]bool{} - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - for _, dep := range meta.module.Dependencies(AlwaysIncludeBuiltin) { - if dep == moduleName { - dependentModuleNames[name] = true - } - } - return true - }) - return maps.Keys(dependentModuleNames) -} - -// BuildAndDeploy attempts to build and deploy all local modules. -func (e *Engine) BuildAndDeploy(ctx context.Context, replicas optional.Option[int32], waitForDeployOnline bool, singleChangeset bool, moduleNames ...string) (err error) { - logger := log.FromContext(ctx) - if len(moduleNames) == 0 { - moduleNames = e.Modules() - } - if len(moduleNames) == 0 { - return nil - } - - defer func() { - if err == nil { - return - } - pendingInitialBuilds := []string{} - e.modulesToBuild.Range(func(name string, value bool) bool { - if value { - pendingInitialBuilds = append(pendingInitialBuilds, name) - } - return true - }) - - // Print out all modules that have yet to build if there are any errors - if len(pendingInitialBuilds) > 0 { - logger.Infof("Modules waiting to build: %s", strings.Join(pendingInitialBuilds, ", ")) - } - }() - - modulesToDeploy := [](*pendingModule){} - buildErr := e.buildWithCallback(ctx, func(buildCtx context.Context, module Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error { - e.modulesToBuild.Store(module.Config.Module, false) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Event: &buildenginepb.EngineEvent_ModuleDeployWaiting{ - ModuleDeployWaiting: &buildenginepb.ModuleDeployWaiting{ - Module: module.Config.Module, - }, - }, - } - pendingDeployModule := newPendingModule(module, tmpDeployDir, deployPaths, moduleSch) - if singleChangeset { - modulesToDeploy = append(modulesToDeploy, pendingDeployModule) - return nil - } - deployErr := make(chan error, 1) - go func() { - deployErr <- e.deployCoordinator.deploy(ctx, []*pendingModule{pendingDeployModule}, replicas) - }() - if waitForDeployOnline { - return errors.WithStack(<-deployErr) - } - return nil - }, moduleNames...) - if buildErr != nil { - return errors.WithStack(buildErr) - } +// TODO: combine with moduleMeta? Make sure moduleState does not escape... +type moduleState struct { + module Module + plugin *languageplugin.LanguagePlugin + configDefaults moduleconfig.CustomDefaults - deployGroup := &errgroup.Group{} - deployGroup.Go(func() error { - // Wait for all build attempts to complete - if singleChangeset { - // Queue the modules for deployment instead of deploying directly - return errors.WithStack(e.deployCoordinator.deploy(ctx, modulesToDeploy, replicas)) - } - return nil - }) - if waitForDeployOnline { - err := deployGroup.Wait() - return errors.WithStack(err) //nolint:wrapcheck - } - return nil + needsToBuild bool + lastEvent *buildenginepb.EngineEvent + cancelModuleWatch context.CancelCauseFunc + transactionProvider optional.Option[transactionProviderFunc] } -type buildCallback func(ctx context.Context, module Module, moduleSch *schema.Module, tmpDeployDir string, deployPaths []string) error - -func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, moduleNames ...string) error { +func (e *Engine) processEvents(ctx context.Context, initialSchema *schema.Schema, endWhenIdle bool, moduleWatcher moduleWatcherFunc, builder buildFunc, deployer deployFunc) (map[string]*moduleState, error) { logger := log.FromContext(ctx) - if len(moduleNames) == 0 { - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - moduleNames = append(moduleNames, name) - return true - }) - } + sch := initialSchema + moduleStates := map[string]*moduleState{} - mustBuildChan := make(chan moduleconfig.ModuleConfig, len(moduleNames)) - wg := errgroup.Group{} - for _, name := range moduleNames { - wg.Go(func() error { - meta, ok := e.moduleMetas.Load(name) - if !ok { - return errors.Errorf("module %q not found", name) - } + idle := true + // var endTime time.Time + var becomeIdleTimer <-chan time.Time - meta, err := copyMetaWithUpdatedDependencies(ctx, meta) - if err != nil { - return errors.Wrapf(err, "could not get dependencies for %s", name) + for { + events := []internalEvent{} + select { + case <-becomeIdleTimer: + becomeIdleTimer = nil + if !isIdle(moduleStates) { + continue } + idle = true + // TODO: pass in module errors + e.engineUpdates.Publish(newEngineEndedEvent(moduleStates)) - e.moduleMetas.Store(name, meta) - mustBuildChan <- meta.module.Config - return nil - }) - } - if err := wg.Wait(); err != nil { - return errors.WithStack(err) //nolint:wrapcheck - } - close(mustBuildChan) - mustBuild := map[string]bool{} - jvm := false - for config := range mustBuildChan { - if config.Language == "java" || config.Language == "kotlin" { - jvm = true - } - mustBuild[config.Module] = true - proto, err := langpb.ModuleConfigToProto(config.Abs()) - if err != nil { - logger.Errorf(err, "failed to marshal module config") + if endWhenIdle { + return moduleStates, nil + } continue + case event := <-e.internalEvents: + events = append(events, event) + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "context cancelled while waiting for events") } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ - ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ - Config: proto, - }, - }, + + drainEvents: + for { + select { + case event := <-e.internalEvents: + events = append(events, event) + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "context cancelled while waiting for events") + default: + break drainEvents + } } - } - if jvm { - // Huge hack that is just for development - // In release builds this is a noop - // This makes sure the JVM jars are up to date when running from source - buildRequiredJARS(ctx) - } - graph, err := e.Graph(moduleNames...) - if err != nil { - return errors.WithStack(err) - } - builtModules := map[string]*schema.Module{ - "builtin": schema.Builtins(), - } + // + // Process each event and update internal state + // - metasMap := map[string]moduleMeta{} - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - metasMap[name] = meta - return true - }) - err = GenerateStubs(ctx, e.projectConfig.Root(), maps.Values(builtModules), metasMap) - if err != nil { - return errors.WithStack(err) - } - - topology, topoErr := TopologicalSort(graph) - if topoErr != nil { - var dependencyCycleErr DependencyCycleError - if !errors.As(topoErr, &dependencyCycleErr) { - return errors.WithStack(topoErr) - } - if err := e.handleDependencyCycleError(ctx, dependencyCycleErr, graph, callback); err != nil { - return errors.WithStack(errors.Join(err, topoErr)) - } - return errors.WithStack(topoErr) - } - errCh := make(chan error, 1024) - for _, group := range topology { - knownSchemas := map[string]*schema.Module{} - err := e.gatherSchemas(builtModules, knownSchemas) - if err != nil { - return errors.WithStack(err) - } + for _, event := range events { + switch event := event.(type) { + case addMetasEvent: + if err := e.handleAddMetasEvent(ctx, event, sch, moduleStates, moduleWatcher); err != nil { + logger.Errorf(err, "failed to handle add metas event") + continue + } + case removeMetaEvent: + // TODO: this needs to go through deploy coordinator + if state, ok := moduleStates[event.config.Module]; ok && state.cancelModuleWatch != nil { + state.cancelModuleWatch(errors.Wrap(context.Canceled, "module removed")) + } + delete(moduleStates, event.config.Module) + e.engineUpdates.Publish(newModuleRemovedEvent(event.config.Module)) - // Collect schemas to be inserted into "built" map for subsequent groups. - schemas := make(chan *schema.Module, len(group)) + case schemaUpdateEvent: + sch = event.newSchema + if err := e.handleSchemaUpdateEvent(ctx, event, sch, moduleStates); err != nil { + logger.Errorf(err, "failed to handle schema update event") + continue + } + case moduleNeedsToBuildEvent: + if state, ok := moduleStates[event.module]; ok { + state.needsToBuild = true + } - wg := errgroup.Group{} - wg.SetLimit(e.parallelism) + case moduleBuildEndedEvent: + if err := e.handleBuildEndedEvent(ctx, event, moduleStates, deployer); err != nil { + logger.Errorf(err, "failed to handle build ended event") + continue + } - logger.Debugf("Building group: %v", group) - for _, moduleName := range group { - wg.Go(func() error { - logger := log.FromContext(ctx).Module(moduleName).Scope("build") - ctx := log.ContextWithLogger(ctx, logger) - err := e.tryBuild(ctx, mustBuild, moduleName, builtModules, schemas, callback) - if err != nil { - errCh <- err + case moduleDeployStartedEvent: + for _, module := range event.modules { + state, ok := moduleStates[module] + if !ok { + logger.Logf(log.Error, "module %s not found in module states", module) + continue + } + extEvent := newModuleDeployStartedEvent(module) + state.lastEvent = extEvent + e.engineUpdates.Publish(extEvent) + } + case moduleDeployEndedEvent: + for _, module := range event.modules { + state, ok := moduleStates[module] + if !ok { + logger.Logf(log.Error, "module %s not found in module states", module) + continue + } + var extEvent *buildenginepb.EngineEvent + if event.err != nil { + extEvent = newModuleDeployFailedEvent(module, event.err) + } else { + extEvent = newModuleDeploySuccessEvent(module) + } + state.lastEvent = extEvent + e.engineUpdates.Publish(extEvent) } - return nil - }) + } } - err = wg.Wait() + // + // Kick off any builds that we can + // + modulesReadyToBuild, deps, err := modulesReadyToBuild(ctx, sch, moduleStates) if err != nil { - return errors.WithStack(err) + logger.Errorf(err, "failed to get modules ready to build") + continue } - - // Now this group is built, collect all the schemas. - close(schemas) - newSchemas := []*schema.Module{} - for sch := range schemas { - builtModules[sch.Name] = sch - newSchemas = append(newSchemas, sch) + if idle && len(modulesReadyToBuild) > 0 { + e.engineUpdates.Publish(newEngineStartedEvent()) + idle = false } - - err = GenerateStubs(ctx, e.projectConfig.Root(), newSchemas, metasMap) - if err != nil { - return errors.WithStack(err) + if err := e.handleAnyModulesReadyToBuild(ctx, sch, moduleStates, modulesReadyToBuild, deps, builder); err != nil { + logger.Errorf(err, "failed to handle any modules ready to build") } - // Sync references to stubs if needed by the runtime - err = e.syncNewStubReferences(ctx, builtModules, metasMap) - if err != nil { - return errors.WithStack(err) + if !idle && isIdle(moduleStates) { + // endTime = time.Now() + becomeIdleTimer = time.After(time.Millisecond * 200) } } - - close(errCh) - allErrors := []error{} - for err := range errCh { - allErrors = append(allErrors, err) - } - - if len(allErrors) > 0 { - return errors.WithStack(errors.Join(allErrors...)) - } - - return nil } -func (e *Engine) handleDependencyCycleError(ctx context.Context, depErr DependencyCycleError, graph map[string][]string, callback buildCallback) error { - // Mark each cylic module as having an error - for _, module := range depErr.Modules { - meta, ok := e.moduleMetas.Load(module) - if !ok { - return errors.Errorf("module %q not found in dependency cycle", module) - } - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) - if err != nil { - return errors.Wrap(err, "failed to marshal module config") - } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ - ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ - Config: configProto, - Errors: &langpb.ErrorList{ - Errors: []*langpb.Error{ - { - Msg: depErr.Error(), - Level: langpb.Error_ERROR_LEVEL_ERROR, - Type: langpb.Error_ERROR_TYPE_FTL, - }, - }, - }, - }, - }, +func (e *Engine) handleAddMetasEvent(ctx context.Context, event addMetasEvent, sch *schema.Schema, moduleStates map[string]*moduleState, moduleWatcher moduleWatcherFunc) error { + newLanguages := map[string]bool{} + for _, preparedModule := range event.preparedModules { + newLanguage := !newLanguages[preparedModule.module.Config.Language] && len(slices.Filter(maps.Values(moduleStates), func(m *moduleState) bool { + return m.module.Config.Language == preparedModule.module.Config.Language + })) == 0 + if newLanguage { + newLanguages[preparedModule.module.Config.Language] = true } } - - // Build the remaining modules - remaining := slices.Filter(maps.Keys(graph), func(module string) bool { - return !slices.Contains(depErr.Modules, module) && module != "builtin" - }) - if len(remaining) == 0 { - return nil + if len(newLanguages) > 0 { + // clean stubs for the language if no modules are present + // TODO: does this clean more than language specific stuff? + CleanStubs(ctx, e.projectConfig.Root(), maps.Keys(newLanguages)...) } - remainingModulesErr := e.buildWithCallback(ctx, callback, remaining...) - - wg := &sync.WaitGroup{} - for _, module := range depErr.Modules { - // Make sure each module in dependency cycle has an active build stream so changes to dependencies are detected - wg.Add(1) - go func() { - defer wg.Done() + newStates := map[string]*moduleState{} + for _, preparedModule := range event.preparedModules { + name := preparedModule.module.Config.Module + newLanguage := len(slices.Filter(maps.Values(moduleStates), func(m *moduleState) bool { + return m.module.Config.Language == preparedModule.module.Config.Language + })) == 0 + extEvent, err := newModuleBuildWaitingEvent(preparedModule.module.Config) + if err != nil { + return errors.Wrapf(err, "failed to watch module %s", name) + } - ignoredSchemas := make(chan *schema.Module, 1) - fakeDeps := map[string]*schema.Module{ - "builtin": schema.Builtins(), + var cancelModuleWatch context.CancelCauseFunc + var transactionProvider optional.Option[transactionProviderFunc] + if moduleWatcher != nil { + txProvider, cancel, err := moduleWatcher(ctx, preparedModule.module.Config, e.internalEvents) + if err != nil { + return errors.Wrapf(err, "failed to watch module %s", name) } - for _, dep := range graph[module] { - if sch, ok := e.GetModuleSchema(dep); ok { - fakeDeps[dep] = sch - continue - } - // not build yet, probably due to dependency cycle - fakeDeps[dep] = &schema.Module{ - Name: dep, - Comments: []string{"Dependency not built yet due to dependency cycle"}, - } + cancelModuleWatch = cancel + transactionProvider = optional.Some(txProvider) + } + state := &moduleState{ + module: preparedModule.module, + plugin: preparedModule.plugin, + configDefaults: preparedModule.configDefaults, + needsToBuild: true, + lastEvent: extEvent, + cancelModuleWatch: cancelModuleWatch, + transactionProvider: transactionProvider, + } + moduleStates[name] = state + newStates[name] = state + e.engineUpdates.Publish(extEvent) + + if newLanguage { + if err := GenerateStubs(ctx, e.projectConfig.Root(), sch.InternalModules(), moduleStates); err != nil { + return errors.WithStack(err) } - _, _, _, _ = e.build(ctx, module, fakeDeps, ignoredSchemas) //nolint:errcheck - close(ignoredSchemas) - }() - } - wg.Wait() - return errors.WithStack(remainingModulesErr) -} - -func (e *Engine) tryBuild(ctx context.Context, mustBuild map[string]bool, moduleName string, builtModules map[string]*schema.Module, schemas chan *schema.Module, callback buildCallback) error { - logger := log.FromContext(ctx) - - if !mustBuild[moduleName] { - return errors.WithStack(e.mustSchema(ctx, moduleName, builtModules, schemas)) - } - - meta, ok := e.moduleMetas.Load(moduleName) - if !ok { - return errors.Errorf("module %q not found", moduleName) - } - - for _, dep := range meta.module.Dependencies(Raw) { - if _, ok := builtModules[dep]; !ok { - logger.Warnf("build skipped because dependency %q failed to build", dep) - return nil } } + // New modules need to know which stubs have already been generated + return errors.Wrapf(SyncStubReferences(ctx, e.projectConfig.Root(), slices.Map(sch.InternalModules(), func(m *schema.Module) string { return m.Name }), newStates, sch), "could not sync stub references after adding new modules") +} - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) +func (e *Engine) handleSchemaUpdateEvent(ctx context.Context, event schemaUpdateEvent, sch *schema.Schema, moduleStates map[string]*moduleState) error { + deps, err := GraphFromStates(moduleStates, sch, slices.Map(event.modulesWithBreakingChanges, func(moduleRef schema.ModuleRefKey) string { return moduleRef.Module })...) if err != nil { - return errors.Wrap(err, "failed to marshal module config") - } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ - Config: configProto, - IsAutoRebuild: false, - }, - }, + return errors.Wrapf(err, "failed to get dependencies") } - - moduleSch, tmpDeployDir, deployPaths, err := e.build(ctx, moduleName, builtModules, schemas) - if err == nil && callback != nil { - // load latest meta as it may have been updated - meta, ok = e.moduleMetas.Load(moduleName) - if !ok { - return errors.Errorf("module %q not found", moduleName) + for _, state := range moduleStates { + deps := deps[state.module.Config.Module] + if _, foundBreakingChange := slices.Find(deps, func(dep string) bool { + return slices.Contains(event.modulesWithBreakingChanges, schema.ModuleRefKey{Realm: sch.FirstInternalRealm().MustGet().Name, Module: dep}) + }); foundBreakingChange { + // mark all transitive dependencies as dirty + state.needsToBuild = true } - return errors.WithStack(callback(ctx, meta.module, moduleSch, tmpDeployDir, deployPaths)) } - return errors.WithStack(err) -} - -// Publish either the schema from the FTL controller, or from a local build. -func (e *Engine) mustSchema(ctx context.Context, moduleName string, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error { - if sch, ok := e.GetModuleSchema(moduleName); ok { - schemas <- sch - return nil + // TODO: not just internal modules + if err := GenerateStubs(ctx, e.projectConfig.Root(), slices.Map(event.modulesWithInterfaceChanges, func(moduleRef schema.ModuleRefKey) *schema.Module { + // TODO: remove MustGet() usage + return sch.Module(moduleRef.Realm, moduleRef.Module).MustGet() + }), moduleStates); err != nil { + return errors.Wrapf(err, "failed to generate stubs for updated modules") } - sch, _, _, err := e.build(ctx, moduleName, builtModules, schemas) - schemas <- sch - return errors.WithStack(err) + // All modules need to know which stubs have been generated + return SyncStubReferences(ctx, e.projectConfig.Root(), slices.Map(sch.InternalModules(), func(m *schema.Module) string { return m.Name }), moduleStates, sch) } -// Build a module and publish its schema. -// -// Assumes that all dependencies have been built and are available in "built". -func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) (moduleSch *schema.Module, tmpDeployDir string, deployPaths []string, err error) { - meta, ok := e.moduleMetas.Load(moduleName) +func (e *Engine) handleBuildEndedEvent(ctx context.Context, event moduleBuildEndedEvent, moduleStates map[string]*moduleState, deployer deployFunc) error { + logger := log.FromContext(ctx) + state, ok := moduleStates[event.config.Module] if !ok { - return nil, "", nil, errors.Errorf("module %q not found", moduleName) + return errors.Errorf("module %s not found in module states", event.config.Module) } - - sch := &schema.Schema{Realms: []*schema.Realm{{Modules: maps.Values(builtModules)}}} //nolint:exptostd - - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) - if err != nil { - return nil, "", nil, errors.Wrap(err, "failed to marshal module config") - } - if meta.module.SQLError != nil { - meta.module = meta.module.CopyWithSQLErrors(nil) - e.moduleMetas.Store(moduleName, meta) + if event.err != nil { + logger.Scope(event.config.Module).Errorf(event.err, "Build failed") + extEvent, err := newModuleBuildFailedEvent(event.config, event.err) + if err != nil { + return errors.Wrapf(err, "failed to create build failed event for module %s", event.config.Module) + } + state.lastEvent = extEvent + e.engineUpdates.Publish(extEvent) + return nil } - moduleSchema, tmpDeployDir, deployPaths, err := build(ctx, e.projectConfig, meta.module, meta.plugin, languageplugin.BuildContext{ - Config: meta.module.Config, - Schema: sch, - Dependencies: meta.module.Dependencies(Raw), - BuildEnv: e.buildEnv, - Os: e.os, - Arch: e.arch, - }, e.devMode, e.devModeEndpointUpdates) - + extEvent, err := newModuleBuildSuccessEvent(event.config) if err != nil { - if errors.Is(err, errSQLError) { - // Keep sql error around so that subsequent auto rebuilds from the plugin keep the sql error - meta.module = meta.module.CopyWithSQLErrors(err) - e.moduleMetas.Store(moduleName, meta) - } - if errors.Is(err, errInvalidateDependencies) { - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ - ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ - Config: configProto, - }, - }, - } - // Do not start a build directly as we are already building out a graph of modules. - // Instead we send to a chan so that it can be processed after. - e.rebuildEvents <- rebuildRequestEvent{module: moduleName} - return nil, "", nil, errors.WithStack(err) - } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ - ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ - Config: configProto, - IsAutoRebuild: false, - Errors: &langpb.ErrorList{ - Errors: errorToLangError(err), - }, - }, - }, - } - return nil, "", nil, errors.WithStack(err) + return errors.Wrapf(err, "failed to create build failed event for module %s", event.config.Module) } + state.lastEvent = extEvent + e.engineUpdates.Publish(extEvent) - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ - ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{ - Config: configProto, - IsAutoRebuild: false, - }, - }, + if deployer(ctx, newPendingModule(state.module, event.tmpDeployDir, event.deployPaths, event.moduleSchema)) { + extEvent := newModuleDeployWaitingEvent(event.config.Module) + state.lastEvent = extEvent + e.engineUpdates.Publish(extEvent) } - - schemas <- moduleSchema - return moduleSchema, tmpDeployDir, deployPaths, nil + return nil } -// Construct a combined schema for a module and its transitive dependencies. -func (e *Engine) gatherSchemas( - moduleSchemas map[string]*schema.Module, - out map[string]*schema.Module, -) error { - for _, sch := range e.targetSchema.Load().InternalModules() { - out[sch.Name] = sch +func modulesReadyToBuild(ctx context.Context, sch *schema.Schema, moduleStates map[string]*moduleState) (readyToBuild []string, deps map[string][]string, err error) { + deps, err = GraphFromStates(moduleStates, sch, maps.Keys(moduleStates)...) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to get dependencies for modules") } + modulesToBuild := slices.Filter(maps.Values(moduleStates), func(state *moduleState) bool { + if !state.needsToBuild { + return false + } + name := state.module.Config.Module - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - if _, ok := moduleSchemas[name]; ok { - out[name] = moduleSchemas[name] - } else { - // We don't want to use a remote schema if we have it locally - delete(out, name) + switch state.lastEvent.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildStarted, + *buildenginepb.EngineEvent_ModuleDeployWaiting, + *buildenginepb.EngineEvent_ModuleDeployStarted: + return false + default: + } + for _, dep := range deps[name] { + if depState, ok := moduleStates[dep]; ok { + if depState.needsToBuild { + return false + } + if depState.lastEvent.Event != nil && depState.lastEvent.GetModuleBuildStarted() != nil { + return false + } + } + if _, ok := sch.Module(sch.FirstInternalRealm().MustGet().Name, dep).Get(); !ok { + return false + } } return true }) - - return nil + return slices.Map(modulesToBuild, func(s *moduleState) string { + return s.module.Config.Module + }), deps, nil } -func (e *Engine) syncNewStubReferences(ctx context.Context, newModules map[string]*schema.Module, metasMap map[string]moduleMeta) error { - fullSchema := &schema.Schema{} //nolint:exptostd - for _, r := range e.targetSchema.Load().Realms { - realm := &schema.Realm{ - Name: r.Name, - External: r.External, +func (e *Engine) handleAnyModulesReadyToBuild(ctx context.Context, sch *schema.Schema, moduleStates map[string]*moduleState, modulesToBuild []string, deps map[string][]string, builder buildFunc) error { + buildCount := len(slices.Filter(maps.Values(moduleStates), func(state *moduleState) bool { + return state.lastEvent.Event != nil && state.lastEvent.GetModuleBuildStarted() != nil + })) + for _, name := range modulesToBuild { + if buildCount >= e.parallelism { + return nil } - if !realm.External { - realm.Modules = maps.Values(newModules) + state, ok := moduleStates[name] + if !ok { + return errors.Errorf("module %s not found in module states", name) } - - for _, module := range r.Modules { - if _, ok := newModules[module.Name]; !ok || realm.External { - realm.Modules = append(realm.Modules, module) + engineEvent, err := newModuleBuildStartedEvent(state.module.Config) + if err != nil { + return errors.Wrapf(err, "failed to create build started event for module %s", state.module.Config.Module) + } + transactionProvider, ok := state.transactionProvider.Get() + var fileTransaction watch.ModifyFilesTransaction + if ok { + fileTransaction = transactionProvider() + } else { + fileTransaction = watch.NoOpFilesTransation{} + } + state.needsToBuild = false + state.lastEvent = engineEvent + e.engineUpdates.Publish(engineEvent) + + strippedSch := reflect.DeepCopy(sch) + modulesToKeep := map[string]bool{} + visitModuleDependencies(state.module.Config.Module, modulesToKeep, deps) + for _, module := range strippedSch.InternalModules() { + if !modulesToKeep[module.Name] { + // remove module from schema + strippedSch.RemoveModule(strippedSch.FirstInternalRealm().MustGet().Name, module.Name) } } - sort.SliceStable(realm.Modules, func(i, j int) bool { - return realm.Modules[i].Name < realm.Modules[j].Name - }) - fullSchema.Realms = append(fullSchema.Realms, realm) - } - - return errors.WithStack(SyncStubReferences(ctx, - e.projectConfig.Root(), - slices.Map(fullSchema.InternalModules(), func(m *schema.Module) string { return m.Name }), - metasMap, - fullSchema)) -} -func (e *Engine) newModuleMeta(ctx context.Context, config moduleconfig.UnvalidatedModuleConfig) (moduleMeta, error) { - plugin, err := languageplugin.New(ctx, config.Dir, config.Language, config.Module) - if err != nil { - return moduleMeta{}, errors.Wrapf(err, "could not create plugin for %s", config.Module) + go builder(ctx, e.projectConfig, state.module, state.plugin, languageplugin.BuildContext{ + Config: state.module.Config, + Schema: strippedSch, + Dependencies: state.module.Dependencies(Raw), + BuildEnv: e.buildEnv, + Os: e.os, + Arch: e.arch, + }, e.devMode, e.devModeEndpointUpdates, fileTransaction, e.internalEvents) + buildCount++ } - events := make(chan languageplugin.PluginEvent, 64) - plugin.Updates().Subscribe(events) + return nil +} - // pass on plugin events to the main event channel - // make sure we do not pass on nil (chan closure) events - go func() { - for { - select { - case event := <-events: - if event == nil { - // chan closed - return - } - e.pluginEvents <- event - case <-ctx.Done(): - return +func visitModuleDependencies(module string, visited map[string]bool, deps map[string][]string) error { + moduleDeps := deps[module] + for _, dep := range moduleDeps { + if !visited[dep] { + visited[dep] = true + if err := visitModuleDependencies(dep, visited, deps); err != nil { + return err } } - }() - - // update config with defaults - customDefaults, err := languageplugin.GetModuleConfigDefaults(ctx, config.Language, config.Dir) - if err != nil { - return moduleMeta{}, errors.Wrapf(err, "could not get defaults provider for %s", config.Module) - } - validConfig, err := config.FillDefaultsAndValidate(customDefaults, e.projectConfig) - if err != nil { - return moduleMeta{}, errors.Wrapf(err, "could not apply defaults for %s", config.Module) } - return moduleMeta{ - module: newModule(validConfig), - plugin: plugin, - events: events, - configDefaults: customDefaults, - }, nil + return nil } -// watchForPluginEvents listens for build updates from language plugins and reports them to the listener. -// These happen when a plugin for a module detects a change and automatically rebuilds. -func (e *Engine) watchForPluginEvents(originalCtx context.Context) { - for { - select { - case event := <-e.pluginEvents: - switch event := event.(type) { - case languageplugin.PluginBuildEvent, languageplugin.AutoRebuildStartedEvent, languageplugin.AutoRebuildEndedEvent: - buildEvent := event.(languageplugin.PluginBuildEvent) //nolint:forcetypeassert - logger := log.FromContext(originalCtx).Module(buildEvent.ModuleName()).Scope("build") - ctx := log.ContextWithLogger(originalCtx, logger) - meta, ok := e.moduleMetas.Load(buildEvent.ModuleName()) - if !ok { - logger.Warnf("module not found for build update") - continue - } - configProto, err := langpb.ModuleConfigToProto(meta.module.Config.Abs()) - if err != nil { - continue - } - switch event := buildEvent.(type) { - case languageplugin.AutoRebuildStartedEvent: - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ - ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ - Config: configProto, - IsAutoRebuild: true, - }, - }, - } - - case languageplugin.AutoRebuildEndedEvent: - moduleSch, tmpDeployDir, deployPaths, err := handleBuildResult(ctx, e.projectConfig, meta.module, event.Result, e.devMode, e.devModeEndpointUpdates, optional.None[*schema.Schema]()) - if err != nil { - if errors.Is(err, errInvalidateDependencies) { - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ - ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ - Config: configProto, - }, - }, - } - // Do not block this goroutine by building a module here. - // Instead we send to a chan so that it can be processed elsewhere. - e.rebuildEvents <- rebuildRequestEvent{module: event.ModuleName()} - // We don't update the state to failed, as it is going to be rebuilt - continue - } - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ - ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ - Config: configProto, - IsAutoRebuild: true, - Errors: &langpb.ErrorList{ - Errors: errorToLangError(err), - }, - }, - }, - } - continue - } - - e.rawEngineUpdates <- &buildenginepb.EngineEvent{ - Timestamp: timestamppb.Now(), - Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ - ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{ - Config: configProto, - IsAutoRebuild: true, - }, - }, - } - e.rebuildEvents <- autoRebuildCompletedEvent{module: event.ModuleName(), schema: moduleSch, tmpDeployDir: tmpDeployDir, deployPaths: deployPaths} - } - case languageplugin.PluginDiedEvent: - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - if meta.plugin != event.Plugin { - return true - } - logger := log.FromContext(originalCtx).Module(name) - logger.Errorf(event.Error, "Plugin died, recreating") - - c, err := moduleconfig.LoadConfig(meta.module.Config.Dir) - if err != nil { - logger.Errorf(err, "Could not recreate plugin: could not load config") - return false - } - newMeta, err := e.newModuleMeta(originalCtx, c) - if err != nil { - logger.Errorf(err, "Could not recreate plugin") - return false - } - e.moduleMetas.Store(name, newMeta) - e.rebuildEvents <- rebuildRequestEvent{module: name} - return false - }) - } - case <-originalCtx.Done(): - // kill all plugins - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - err := meta.plugin.Kill() - if err != nil { - log.FromContext(originalCtx).Errorf(err, "could not kill plugin") - } - return true - }) - return +func isIdle(moduleStates map[string]*moduleState) bool { + if len(moduleStates) == 0 { + return true + } + for _, state := range moduleStates { + switch state.lastEvent.Event.(type) { + case *buildenginepb.EngineEvent_ModuleBuildStarted, + *buildenginepb.EngineEvent_ModuleDeployStarted: + return false + default: } } + return true } diff --git a/internal/buildengine/events.go b/internal/buildengine/events.go new file mode 100644 index 0000000000..d2cbcc6c94 --- /dev/null +++ b/internal/buildengine/events.go @@ -0,0 +1,152 @@ +package buildengine + +import ( + errors "github.com/alecthomas/errors" + buildenginepb "github.com/block/ftl/backend/protos/xyz/block/ftl/buildengine/v1" + langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" + "github.com/block/ftl/internal/moduleconfig" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func newModuleRemovedEvent(module string) *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleRemoved{ + ModuleRemoved: &buildenginepb.ModuleRemoved{ + Module: module, + }, + }, + } +} + +func newModuleBuildWaitingEvent(config moduleconfig.ModuleConfig) (*buildenginepb.EngineEvent, error) { + proto, err := langpb.ModuleConfigToProto(config.Abs()) + if err != nil { + return nil, errors.Wrap(err, "failed to convert module config to proto") + } + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleBuildWaiting{ + ModuleBuildWaiting: &buildenginepb.ModuleBuildWaiting{ + Config: proto, + }, + }, + }, nil +} + +func newModuleBuildStartedEvent(config moduleconfig.ModuleConfig) (*buildenginepb.EngineEvent, error) { + proto, err := langpb.ModuleConfigToProto(config.Abs()) + if err != nil { + return nil, errors.Wrap(err, "failed to convert module config to proto") + } + return &buildenginepb.EngineEvent{Event: &buildenginepb.EngineEvent_ModuleBuildStarted{ + ModuleBuildStarted: &buildenginepb.ModuleBuildStarted{ + Config: proto, + }, + }, + }, nil +} + +func newModuleBuildFailedEvent(config moduleconfig.ModuleConfig, buildErr error) (*buildenginepb.EngineEvent, error) { + proto, err := langpb.ModuleConfigToProto(config.Abs()) + if err != nil { + return nil, errors.Wrap(err, "failed to convert module config to proto") + } + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleBuildFailed{ + ModuleBuildFailed: &buildenginepb.ModuleBuildFailed{ + Config: proto, + Errors: &langpb.ErrorList{Errors: errorToLangError(buildErr)}, + }, + }, + }, nil +} + +func newModuleBuildSuccessEvent(config moduleconfig.ModuleConfig) (*buildenginepb.EngineEvent, error) { + proto, err := langpb.ModuleConfigToProto(config.Abs()) + if err != nil { + return nil, errors.Wrap(err, "failed to convert module config to proto") + } + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleBuildSuccess{ + ModuleBuildSuccess: &buildenginepb.ModuleBuildSuccess{ + Config: proto, + }, + }, + }, nil +} + +func newModuleDeployWaitingEvent(module string) *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeployWaiting{ + ModuleDeployWaiting: &buildenginepb.ModuleDeployWaiting{ + Module: module, + }, + }, + } +} + +func newModuleDeployStartedEvent(module string) *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeployStarted{ + ModuleDeployStarted: &buildenginepb.ModuleDeployStarted{ + Module: module, + }, + }, + } +} + +func newModuleDeployFailedEvent(module string, deployErr error) *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeployFailed{ + ModuleDeployFailed: &buildenginepb.ModuleDeployFailed{ + Module: module, + Errors: &langpb.ErrorList{Errors: errorToLangError(deployErr)}, + }, + }, + } +} + +func newModuleDeploySuccessEvent(module string) *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_ModuleDeploySuccess{ + ModuleDeploySuccess: &buildenginepb.ModuleDeploySuccess{ + Module: module, + }, + }, + } +} + +func newEngineStartedEvent() *buildenginepb.EngineEvent { + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_EngineStarted{ + EngineStarted: &buildenginepb.EngineStarted{}, + }, + } +} + +func newEngineEndedEvent(moduleStates map[string]*moduleState) *buildenginepb.EngineEvent { + modulesOutput := []*buildenginepb.EngineEnded_Module{} + for name, state := range moduleStates { + modulesOutput = append(modulesOutput, &buildenginepb.EngineEnded_Module{ + Module: name, + Path: state.module.Config.Dir, + Errors: state.lastEvent.GetModuleBuildFailed().GetErrors(), + }) + } + return &buildenginepb.EngineEvent{ + Timestamp: timestamppb.Now(), + Event: &buildenginepb.EngineEvent_EngineEnded{ + EngineEnded: &buildenginepb.EngineEnded{ + Modules: modulesOutput, + }, + }, + } +} diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 424c5e919e..6b07aa9ec8 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -8,20 +8,14 @@ import ( "time" "connectrpc.com/connect" - "github.com/alecthomas/atomic" errors "github.com/alecthomas/errors" "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/pubsub" - "github.com/alecthomas/types/result" langpb "github.com/block/ftl/backend/protos/xyz/block/ftl/language/v1" "github.com/block/ftl/common/builderrors" - "github.com/block/ftl/common/log" "github.com/block/ftl/common/schema" - "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/moduleconfig" "github.com/block/ftl/internal/projectconfig" - "github.com/block/ftl/internal/watch" ) const BuildLockTimeout = time.Minute @@ -44,50 +38,12 @@ type BuildResult struct { // File that the runner can use to pass info into the hot reload endpoint HotReloadEndpoint optional.Option[string] HotReloadVersion optional.Option[int64] - modifiedFiles []string + ModifiedFiles []string DebugPort int redeployNotRequired bool } -// PluginEvent is used to notify of updates from the plugin. -// -//sumtype:decl -type PluginEvent interface { - pluginEvent() -} - -type PluginBuildEvent interface { - PluginEvent - ModuleName() string -} - -// AutoRebuildStartedEvent is sent when the plugin starts an automatic rebuild. -type AutoRebuildStartedEvent struct { - Module string -} - -func (AutoRebuildStartedEvent) pluginEvent() {} -func (e AutoRebuildStartedEvent) ModuleName() string { return e.Module } - -// AutoRebuildEndedEvent is sent when the plugin ends an automatic rebuild. -type AutoRebuildEndedEvent struct { - Module string - Result result.Result[BuildResult] -} - -func (AutoRebuildEndedEvent) pluginEvent() {} -func (e AutoRebuildEndedEvent) ModuleName() string { return e.Module } - -// PluginDiedEvent is sent when the plugin dies. -type PluginDiedEvent struct { - // Plugins do not always have an associated module name, so we include the module - Plugin *LanguagePlugin - Error error -} - -func (PluginDiedEvent) pluginEvent() {} - // BuildContext contains contextual information needed to build. // // Any change to the build context would require a new build. @@ -114,8 +70,6 @@ func New(ctx context.Context, dir, language, name string) (p *LanguagePlugin, er func newPluginForTesting(ctx context.Context, client pluginClient) *LanguagePlugin { plugin := &LanguagePlugin{ client: client, - updates: pubsub.New[PluginEvent](), - bctx: atomic.New[*buildInfo](nil), buildRunning: &sync.Mutex{}, } go plugin.watchForCmdError(ctx) @@ -126,10 +80,6 @@ func newPluginForTesting(ctx context.Context, client pluginClient) *LanguagePlug type LanguagePlugin struct { client pluginClient - // cancels the run() context - updates *pubsub.Topic[PluginEvent] - watch *pubsub.Topic[watch.WatchEvent] - bctx *atomic.Value[*buildInfo] buildRunning *sync.Mutex } @@ -144,12 +94,6 @@ func (p *LanguagePlugin) Kill() error { return nil } -// Updates topic for all update events from the plugin -// The same topic must be returned each time this method is called -func (p *LanguagePlugin) Updates() *pubsub.Topic[PluginEvent] { - return p.updates -} - // GetDependencies returns the dependencies of the module. func (p *LanguagePlugin) GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) { configProto, err := langpb.ModuleConfigToProto(config.Abs()) @@ -218,11 +162,10 @@ func (p *LanguagePlugin) SyncStubReferences(ctx context.Context, config moduleco // Build builds the module with the latest config and schema. // In dev mode, plugin is responsible for automatically rebuilding as relevant files within the module change, // and publishing these automatic builds updates to Updates(). -func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig.Config, stubsRoot string, bctx BuildContext, rebuildAutomatically bool) (BuildResult, error) { +func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig.Config, stubsRoot string, bctx BuildContext, devModeBuild bool) (BuildResult, error) { p.buildRunning.Lock() defer p.buildRunning.Unlock() startTime := time.Now() - p.bctx.Store(&buildInfo{projectConfig: projectConfig, stubsRoot: stubsRoot, bctx: bctx}) configProto, err := langpb.ModuleConfigToProto(bctx.Config.Abs()) if err != nil { @@ -233,7 +176,7 @@ func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig. result, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ ProjectConfig: langpb.ProjectConfigToProto(projectConfig), StubsRoot: stubsRoot, - DevModeBuild: rebuildAutomatically, + DevModeBuild: devModeBuild, BuildContext: &langpb.BuildContext{ ModuleConfig: configProto, Schema: schemaProto, @@ -245,58 +188,9 @@ func (p *LanguagePlugin) Build(ctx context.Context, projectConfig projectconfig. })) if err != nil { - return BuildResult{}, errors.Wrap(err, "failed to invoke build command") + return BuildResult{}, errors.WithStack(err) } - if rebuildAutomatically && p.watch == nil { - watcher := watch.NewWatcher(optional.None[string](), bctx.Config.Watch...) - updates, err := watcher.Watch(ctx, time.Second, []string{bctx.Config.Dir}) - if err != nil { - log.FromContext(ctx).Errorf(err, "Failed to watch module directory") - return buildResultFromProto(result.Msg, startTime) - } - p.watch = updates - go p.runWatch(ctx, watcher) - } - return buildResultFromProto(result.Msg, startTime) - -} - -func (p *LanguagePlugin) runWatch(ctx context.Context, watcher *watch.Watcher) { - logger := log.FromContext(ctx) - defer func() { - p.watch = nil - }() - updates := make(chan watch.WatchEvent) - p.watch.Subscribe(updates) - for i := range channels.IterContext(ctx, updates) { - if _, ok := i.(watch.WatchEventModuleChanged); ok { - info := p.bctx.Load() - tx := watcher.GetTransaction(info.bctx.Config.Dir) - err := tx.Begin() - if err != nil { - logger.Errorf(err, "Failed to start watch transaction") - } - p.updates.Publish(AutoRebuildStartedEvent{Module: info.bctx.Config.Module}) - br, err := p.Build(ctx, info.projectConfig, info.stubsRoot, info.bctx, true) - if err != nil { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Err[BuildResult](err)}) - } else { - err = tx.ModifiedFiles(br.modifiedFiles...) - if err != nil { - if !br.redeployNotRequired { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Err[BuildResult](err)}) - } - } else { - p.updates.Publish(AutoRebuildEndedEvent{Module: info.bctx.Config.Module, Result: result.Ok[BuildResult](br)}) - } - } - err = tx.End() - if err != nil { - logger.Errorf(err, "Failed to end watch transaction") - } - } - } } func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (buildResult BuildResult, err error) { @@ -327,7 +221,7 @@ func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (bu HotReloadEndpoint: optional.Ptr(buildSuccess.DevHotReloadEndpoint), HotReloadVersion: optional.Ptr(buildSuccess.DevHotReloadVersion), DebugPort: port, - modifiedFiles: buildSuccess.ModifiedFiles, + ModifiedFiles: buildSuccess.ModifiedFiles, redeployNotRequired: buildSuccess.RedeployNotRequired, }, nil case *langpb.BuildResponse_BuildFailure: @@ -350,17 +244,13 @@ func buildResultFromProto(result *langpb.BuildResponse, startTime time.Time) (bu StartTime: startTime, Errors: errs, InvalidateDependencies: buildFailure.InvalidateDependencies, - modifiedFiles: buildFailure.ModifiedFiles, + ModifiedFiles: buildFailure.ModifiedFiles, }, nil default: panic(fmt.Sprintf("unexpected result type %T", result)) } } -func contextID(config moduleconfig.ModuleConfig, counter int) string { - return fmt.Sprintf("%v-%v", config.Module, counter) -} - type buildInfo struct { projectConfig projectconfig.Config stubsRoot string @@ -374,10 +264,11 @@ func (p *LanguagePlugin) watchForCmdError(ctx context.Context) { // closed return } - p.updates.Publish(PluginDiedEvent{ - Plugin: p, - Error: err, - }) + // TODO: handle this + // p.updates.Publish(PluginDiedEvent{ + // Plugin: p, + // Error: err, + // }) case <-ctx.Done(): diff --git a/internal/buildengine/stubs.go b/internal/buildengine/stubs.go index 0d6b800454..8147d5ba09 100644 --- a/internal/buildengine/stubs.go +++ b/internal/buildengine/stubs.go @@ -19,7 +19,7 @@ var buildDirName = ".ftl" // GenerateStubs generates stubs for the given modules. // // Currently, only Go stubs are supported. Kotlin and other language stubs can be added in the future. -func GenerateStubs(ctx context.Context, projectRoot string, modules []*schema.Module, metas map[string]moduleMeta) error { +func GenerateStubs(ctx context.Context, projectRoot string, modules []*schema.Module, metas map[string]*moduleState) error { err := generateStubsForEachLanguage(ctx, projectRoot, modules, metas) if err != nil { return errors.WithStack(err) @@ -28,7 +28,7 @@ func GenerateStubs(ctx context.Context, projectRoot string, modules []*schema.Mo } // CleanStubs removes all generated stubs. -func CleanStubs(ctx context.Context, projectRoot string, configs []moduleconfig.UnvalidatedModuleConfig) error { +func CleanStubs(ctx context.Context, projectRoot string, languages ...string) error { logger := log.FromContext(ctx) logger.Debugf("Deleting all generated stubs") sharedFtlDir := filepath.Join(projectRoot, buildDirName) @@ -39,13 +39,7 @@ func CleanStubs(ctx context.Context, projectRoot string, configs []moduleconfig. return errors.Wrapf(err, "failed to remove %s", resourcesDir) } - // Figure out which languages we need to clean. - languages := make(map[string]struct{}) - for _, config := range configs { - languages[config.Language] = struct{}{} - } - - for lang := range languages { + for _, lang := range languages { stubsDir := filepath.Join(sharedFtlDir, lang, "modules") err := os.RemoveAll(stubsDir) if err != nil && !os.IsNotExist(err) { @@ -59,13 +53,13 @@ func CleanStubs(ctx context.Context, projectRoot string, configs []moduleconfig. // SyncStubReferences syncs the references in the generated stubs. // // For Go, this means updating all the go.work files to include all known modules in the shared stubbed modules directory. -func SyncStubReferences(ctx context.Context, projectRoot string, moduleNames []string, metas map[string]moduleMeta, view *schema.Schema) error { +func SyncStubReferences(ctx context.Context, projectRoot string, moduleNames []string, states map[string]*moduleState, view *schema.Schema) error { wg, wgctx := errgroup.WithContext(ctx) - for _, meta := range metas { + for _, state := range states { wg.Go(func() error { - stubsRoot := stubsLanguageDir(projectRoot, meta.module.Config.Language) - if err := meta.plugin.SyncStubReferences(wgctx, meta.module.Config, stubsRoot, moduleNames, view); err != nil { - return errors.Wrapf(err, "failed to sync go stub references for %s", meta.module.Config.Module) + stubsRoot := stubsLanguageDir(projectRoot, state.module.Config.Language) + if err := state.plugin.SyncStubReferences(wgctx, state.module.Config, stubsRoot, moduleNames, view); err != nil { + return errors.Wrapf(err, "failed to sync go stub references for %s", state.module.Config.Module) } return nil }) @@ -85,27 +79,27 @@ func stubsModuleDir(projectRoot, language, module string) string { return filepath.Join(stubsLanguageDir(projectRoot, language), module) } -func generateStubsForEachLanguage(ctx context.Context, projectRoot string, modules []*schema.Module, metas map[string]moduleMeta) error { +func generateStubsForEachLanguage(ctx context.Context, projectRoot string, modules []*schema.Module, moduleStates map[string]*moduleState) error { modulesByName := map[string]*schema.Module{} for _, module := range modules { modulesByName[module.Name] = module } - metasByLanguage := map[string][]moduleMeta{} - for _, meta := range metas { - metasByLanguage[meta.module.Config.Language] = append(metasByLanguage[meta.module.Config.Language], meta) + modulesByLanguage := map[string][]*moduleState{} + for _, state := range moduleStates { + modulesByLanguage[state.module.Config.Language] = append(modulesByLanguage[state.module.Config.Language], state) } wg, wgctx := errgroup.WithContext(ctx) - for language, metasForLang := range metasByLanguage { + for language, modulesForLang := range modulesByLanguage { for idx, module := range modules { // spread the load across plugins - assignedMeta := metasForLang[idx%len(metasForLang)] - config := metas[module.Name].module.Config + assignedMeta := modulesForLang[idx%len(modulesForLang)] + config := assignedMeta.module.Config + var nativeConfig optional.Option[moduleconfig.ModuleConfig] + if moduleState, ok := moduleStates[module.Name]; ok { + nativeConfig = optional.Some(moduleState.module.Config) + } wg.Go(func() error { path := stubsModuleDir(projectRoot, language, module.Name) - var nativeConfig optional.Option[moduleconfig.ModuleConfig] - if config.Module == "builtin" || config.Language != language { - nativeConfig = optional.Some(assignedMeta.module.Config) - } if err := assignedMeta.plugin.GenerateStubs(wgctx, path, module, config, nativeConfig); err != nil { return errors.WithStack(err) //nolint:wrapcheck } diff --git a/internal/watch/filehash.go b/internal/watch/filehash.go index 78b32847e4..c2668a5b69 100644 --- a/internal/watch/filehash.go +++ b/internal/watch/filehash.go @@ -78,9 +78,6 @@ func ComputeFileHashes(dir string, skipGitIgnoredFiles bool, patterns []string) return errors.WithStack(err) } if !matched { - if patterns[0] == "*" { - return errors.Errorf("file %s:%s does not match any: %s", rootDir, srcPath, patterns) - } return nil } fileHashes[srcPath] = hash diff --git a/internal/watch/watch.go b/internal/watch/watch.go index 6772672370..9aeed831bb 100644 --- a/internal/watch/watch.go +++ b/internal/watch/watch.go @@ -24,11 +24,11 @@ import ( // changed. type WatchEvent interface{ watchEvent() } -type WatchEventModuleAdded struct { - Config moduleconfig.UnvalidatedModuleConfig +type WatchEventModulesAdded struct { + Configs []moduleconfig.UnvalidatedModuleConfig } -func (WatchEventModuleAdded) watchEvent() {} +func (WatchEventModulesAdded) watchEvent() {} type WatchEventModuleRemoved struct { Config moduleconfig.UnvalidatedModuleConfig @@ -193,6 +193,7 @@ func (w *Watcher) detectChanges(ctx context.Context, topic *pubsub.Topic[WatchEv } // Compare the modules to the existing modules. + addedConfigs := []moduleconfig.UnvalidatedModuleConfig{} for _, config := range modulesByDir { if transactions, ok := w.moduleTransactions[config.Dir]; ok && len(transactions) > 0 { // Skip modules that currently have transactions @@ -212,7 +213,7 @@ func (w *Watcher) detectChanges(ctx context.Context, topic *pubsub.Topic[WatchEv if ctx.Err() != nil { return } - topic.Publish(WatchEventModuleAdded{Config: config}) + addedConfigs = append(addedConfigs, config) } else { // Compare hashes changes := CompareFileHashes(existingModule.Hashes, hashes) @@ -231,6 +232,9 @@ func (w *Watcher) detectChanges(ctx context.Context, topic *pubsub.Topic[WatchEv } } } + if len(addedConfigs) > 0 { + topic.Publish(WatchEventModulesAdded{Configs: addedConfigs}) + } } // ModifyFilesTransaction allows builds to modify files in a module without triggering a watch event. @@ -287,6 +291,9 @@ func (t *modifyFilesTransaction) ModifiedFiles(paths ...string) error { if !t.isActive { return errors.Errorf("can not modify file because transaction is not active: %v", paths) } + if len(paths) == 0 { + return nil + } t.watcher.mutex.Lock() defer t.watcher.mutex.Unlock() @@ -316,3 +323,19 @@ func (t *modifyFilesTransaction) ModifiedFiles(paths ...string) error { return nil } + +type NoOpFilesTransation struct{} + +var _ ModifyFilesTransaction = NoOpFilesTransation{} + +func (NoOpFilesTransation) Begin() error { + return nil +} + +func (NoOpFilesTransation) End() error { + return nil +} + +func (NoOpFilesTransation) ModifiedFiles(paths ...string) error { + return nil +}