From 2cc693d55e9cd9bfb80ce839a0421533416fbcea Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 30 May 2025 11:35:52 +1000 Subject: [PATCH 1/2] chore: introduce subscribeable interface --- backend/admin/service.go | 5 ++--- backend/console/console.go | 3 +-- backend/ingress/view.go | 2 +- backend/provisioner/service.go | 2 +- backend/schemamirror/schema_service.go | 3 +-- internal/channels/subscribe.go | 18 ++++++++++++++++++ .../deploymentcontext/deployment_context.go | 3 +-- internal/deploymentcontext/provider.go | 12 +++--------- internal/deploymentcontext/template_routing.go | 15 ++++++++------- internal/routing/routing.go | 16 ++++++++++------ internal/routing/verb_routing.go | 4 +--- .../schemaeventsource/schemaeventsource.go | 3 +++ 12 files changed, 50 insertions(+), 36 deletions(-) create mode 100644 internal/channels/subscribe.go diff --git a/backend/admin/service.go b/backend/admin/service.go index ef4f03c089..9133993d58 100644 --- a/backend/admin/service.go +++ b/backend/admin/service.go @@ -547,7 +547,7 @@ func (s *Service) ApplyChangeset(ctx context.Context, req *connect.Request[admin }); err != nil { return errors.Wrap(err, "failed to send changeset") } - for e := range channels.IterContext(ctx, s.source.Subscribe(ctx)) { + for e := range channels.IterSubscribable[schema.Notification](ctx, s.source) { switch event := e.(type) { case *schema.ChangesetFinalizedNotification: if event.Key != key { @@ -589,8 +589,7 @@ func (s *Service) ApplyChangeset(ctx context.Context, req *connect.Request[admin } func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], resp *connect.ServerStream[ftlv1.PullSchemaResponse]) error { - events := s.source.Subscribe(ctx) - for event := range channels.IterContext(ctx, events) { + for event := range channels.IterSubscribable[schema.Notification](ctx, s.source) { switch e := event.(type) { case *schema.FullSchemaNotification: proto := e.ToProto() diff --git a/backend/console/console.go b/backend/console/console.go index 93262aa078..d96d8cfb0d 100644 --- a/backend/console/console.go +++ b/backend/console/console.go @@ -404,8 +404,7 @@ func (s *Service) StreamModules(ctx context.Context, req *connect.Request[consol return errors.WithStack(err) } - events := s.schemaEventSource.Subscribe(ctx) - for range channels.IterContext(ctx, events) { + for range channels.IterSubscribable[schema.Notification](ctx, s.schemaEventSource) { err = s.sendStreamModulesResp(stream) if err != nil { return errors.WithStack(err) diff --git a/backend/ingress/view.go b/backend/ingress/view.go index 4b1e0741b9..f696717509 100644 --- a/backend/ingress/view.go +++ b/backend/ingress/view.go @@ -20,7 +20,7 @@ func syncView(ctx context.Context, schemaEventSource *schemaeventsource.EventSou }) logger.Debugf("Starting routing sync from schema") go func() { - for range channels.IterContext(ctx, schemaEventSource.Subscribe(ctx)) { + for range channels.IterSubscribable[schema.Notification](ctx, schemaEventSource) { state := extractIngressRoutingEntries(schemaEventSource.CanonicalView()) out.Store(state) } diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index f6ed6e624b..751b8588f5 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -95,7 +95,7 @@ func Start( return errors.WithStack(err) } - for event := range channels.IterContext(ctx, svc.eventSource.Subscribe(ctx)) { + for event := range channels.IterSubscribable[schema.Notification](ctx, svc.eventSource) { go func() { switch e := event.(type) { case *schema.ChangesetCreatedNotification: diff --git a/backend/schemamirror/schema_service.go b/backend/schemamirror/schema_service.go index f8dbfbc7bb..9eec97dbe9 100644 --- a/backend/schemamirror/schema_service.go +++ b/backend/schemamirror/schema_service.go @@ -52,7 +52,6 @@ func (s *SchemaService) PullSchema(ctx context.Context, req *connect.Request[ftl if !s.mirror.receiving.Load() { return connect.NewError(connect.CodeUnavailable, errors.New("mirror is not receiving schema updates")) } - updates := s.mirror.eventSource.Subscribe(ctx) if err := stream.Send(&ftlv1.PullSchemaResponse{ Event: &schemapb.Notification{ Value: &schemapb.Notification_FullSchemaNotification{ @@ -67,7 +66,7 @@ func (s *SchemaService) PullSchema(ctx context.Context, req *connect.Request[ftl }); err != nil { return errors.Wrap(err, "failed to send initial schema") } - for event := range channels.IterContext(ctx, updates) { + for event := range channels.IterSubscribable[schema.Notification](ctx, s.mirror.eventSource) { if err := stream.Send(&ftlv1.PullSchemaResponse{ Event: schema.NotificationToProto(event), }); err != nil { diff --git a/internal/channels/subscribe.go b/internal/channels/subscribe.go new file mode 100644 index 0000000000..f305152afc --- /dev/null +++ b/internal/channels/subscribe.go @@ -0,0 +1,18 @@ +package channels + +import ( + "context" + "iter" +) + +// Subscribable represents an object that can be subscribed too until the context is cancelled +type Subscribable[T any] interface { + Subscribe(ctx context.Context) <-chan T +} + +// IterSubscribable subscribes to the object and iterates over events until the context is cancelled +// +// Check ctx.Err() != nil to detect if the context was cancelled. +func IterSubscribable[T any](ctx context.Context, subscribable Subscribable[T]) iter.Seq[T] { + return IterContext(ctx, subscribable.Subscribe(ctx)) +} diff --git a/internal/deploymentcontext/deployment_context.go b/internal/deploymentcontext/deployment_context.go index 243c7a9502..8f92e70aa9 100644 --- a/internal/deploymentcontext/deployment_context.go +++ b/internal/deploymentcontext/deployment_context.go @@ -31,8 +31,7 @@ type SecretsProvider func(ctx context.Context) (map[string][]byte, error) type ConfigProvider func(ctx context.Context) (map[string][]byte, error) type RouteProvider interface { - Subscribe() chan string - Unsubscribe(c chan string) + Subscribe(ctx context.Context) <-chan string Route(module string) string } diff --git a/internal/deploymentcontext/provider.go b/internal/deploymentcontext/provider.go index e183765317..7f528ea0d7 100644 --- a/internal/deploymentcontext/provider.go +++ b/internal/deploymentcontext/provider.go @@ -61,13 +61,8 @@ func (r *routeTableRouting) Route(module string) string { } // Subscribe implements RouteProvider. -func (r *routeTableRouting) Subscribe() chan string { - return r.table.Subscribe() -} - -// Unsubscribe implements RouteProvider. -func (r *routeTableRouting) Unsubscribe(c chan string) { - r.table.Unsubscribe(c) +func (r *routeTableRouting) Subscribe(ctx context.Context) <-chan string { + return r.table.Subscribe(ctx) } // NewProvider retrieves config, secrets and DSNs for a module. @@ -76,7 +71,7 @@ func NewProvider(key key.Deployment, routeProvider RouteProvider, moduleSchema * ret := make(chan DeploymentContext, 16) logger := log.FromContext(ctx) - updates := routeProvider.Subscribe() + updates := routeProvider.Subscribe(ctx) module := moduleSchema.Name // Initialize checksum to -1; a zero checksum does occur when the context contains no settings @@ -111,7 +106,6 @@ func NewProvider(key key.Deployment, routeProvider RouteProvider, moduleSchema * configs := map[string][]byte{} secrets := map[string][]byte{} go func() { - defer routeProvider.Unsubscribe(updates) for { h := sha.New() diff --git a/internal/deploymentcontext/template_routing.go b/internal/deploymentcontext/template_routing.go index e1a9a90416..bb3e192183 100644 --- a/internal/deploymentcontext/template_routing.go +++ b/internal/deploymentcontext/template_routing.go @@ -1,6 +1,7 @@ package deploymentcontext import ( + "context" "os" ) @@ -29,11 +30,11 @@ func (t *templateRouteTable) Route(module string) string { } // Subscribe implements deploymentcontext.RouteProvider. -func (t *templateRouteTable) Subscribe() chan string { - return make(chan string) -} - -// Unsubscribe implements deploymentcontext.RouteProvider. -func (t *templateRouteTable) Unsubscribe(c chan string) { - close(c) +func (t *templateRouteTable) Subscribe(ctx context.Context) <-chan string { + ret := make(chan string) + go func() { + <-ctx.Done() + close(ret) + }() + return ret } diff --git a/internal/routing/routing.go b/internal/routing/routing.go index 36f119ef4f..8112951232 100644 --- a/internal/routing/routing.go +++ b/internal/routing/routing.go @@ -15,6 +15,8 @@ import ( "github.com/block/ftl/internal/schema/schemaeventsource" ) +var _ channels.Subscribable[string] = (*RouteTable)(nil) + type RouteView struct { byDeployment map[string]*url.URL moduleToDeployment map[string]key.Deployment @@ -37,7 +39,7 @@ func New(ctx context.Context, changes *schemaeventsource.EventSource) *RouteTabl } func (r *RouteTable) run(ctx context.Context, changes *schemaeventsource.EventSource) { - for event := range channels.IterContext(ctx, changes.Subscribe(ctx)) { + for event := range channels.IterSubscribable[schema.Notification](ctx, changes) { logger := log.FromContext(ctx) logger.Tracef("Received schema event: %T", event) old := r.routes.Load() @@ -90,11 +92,13 @@ func (r RouteView) Schema() *schema.Schema { return r.schema } -func (r *RouteTable) Subscribe() chan string { - return r.changeNotification.Subscribe(nil) -} -func (r *RouteTable) Unsubscribe(s chan string) { - r.changeNotification.Unsubscribe(s) +func (r *RouteTable) Subscribe(ctx context.Context) <-chan string { + ret := r.changeNotification.Subscribe(nil) + go func() { + <-ctx.Done() + r.changeNotification.Unsubscribe(ret) + }() + return ret } func extractRoutes(ctx context.Context, sch *schema.Schema) RouteView { diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index 7d7bd89906..f784c208b2 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -95,11 +95,9 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelin moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](), timelineClient: timelineClient, } - routeUpdates := svc.routingTable.Subscribe() logger := log.FromContext(ctx) go func() { - defer svc.routingTable.Unsubscribe(routeUpdates) - for module := range channels.IterContext(ctx, routeUpdates) { + for module := range channels.IterSubscribable(ctx, routeTable) { logger.Tracef("Removing client for module %s", module) svc.moduleClients.Delete(module) } diff --git a/internal/schema/schemaeventsource/schemaeventsource.go b/internal/schema/schemaeventsource/schemaeventsource.go index 5a482fc9d9..6546fee924 100644 --- a/internal/schema/schemaeventsource/schemaeventsource.go +++ b/internal/schema/schemaeventsource/schemaeventsource.go @@ -2,6 +2,7 @@ package schemaeventsource import ( "context" + "github.com/block/ftl/internal/channels" "slices" "sync" "time" @@ -22,6 +23,8 @@ import ( "github.com/block/ftl/internal/rpc" ) +var _ channels.Subscribable[schema.Notification] = (*EventSource)(nil) + type PullSchemaClient interface { PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error) Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) From 8747f2660c5d490daba5436cc93638c1f411f3e5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 4 Jun 2025 04:38:25 +0000 Subject: [PATCH 2/2] chore(autofmt): Automated formatting --- internal/schema/schemaeventsource/schemaeventsource.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/schema/schemaeventsource/schemaeventsource.go b/internal/schema/schemaeventsource/schemaeventsource.go index 6546fee924..16d9675b20 100644 --- a/internal/schema/schemaeventsource/schemaeventsource.go +++ b/internal/schema/schemaeventsource/schemaeventsource.go @@ -2,7 +2,6 @@ package schemaeventsource import ( "context" - "github.com/block/ftl/internal/channels" "slices" "sync" "time" @@ -20,6 +19,7 @@ import ( "github.com/block/ftl/common/reflect" "github.com/block/ftl/common/schema" islices "github.com/block/ftl/common/slices" + "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/rpc" )