Skip to content
This repository was archived by the owner on Aug 17, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions backend/admin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (s *Service) ApplyChangeset(ctx context.Context, req *connect.Request[admin
}); err != nil {
return errors.Wrap(err, "failed to send changeset")
}
for e := range channels.IterContext(ctx, s.source.Subscribe(ctx)) {
for e := range channels.IterSubscribable[schema.Notification](ctx, s.source) {
switch event := e.(type) {
case *schema.ChangesetFinalizedNotification:
if event.Key != key {
Expand Down Expand Up @@ -589,8 +589,7 @@ func (s *Service) ApplyChangeset(ctx context.Context, req *connect.Request[admin
}

func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest], resp *connect.ServerStream[ftlv1.PullSchemaResponse]) error {
events := s.source.Subscribe(ctx)
for event := range channels.IterContext(ctx, events) {
for event := range channels.IterSubscribable[schema.Notification](ctx, s.source) {
switch e := event.(type) {
case *schema.FullSchemaNotification:
proto := e.ToProto()
Expand Down
3 changes: 1 addition & 2 deletions backend/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,7 @@ func (s *Service) StreamModules(ctx context.Context, req *connect.Request[consol
return errors.WithStack(err)
}

events := s.schemaEventSource.Subscribe(ctx)
for range channels.IterContext(ctx, events) {
for range channels.IterSubscribable[schema.Notification](ctx, s.schemaEventSource) {
err = s.sendStreamModulesResp(stream)
if err != nil {
return errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion backend/ingress/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func syncView(ctx context.Context, schemaEventSource *schemaeventsource.EventSou
})
logger.Debugf("Starting routing sync from schema")
go func() {
for range channels.IterContext(ctx, schemaEventSource.Subscribe(ctx)) {
for range channels.IterSubscribable[schema.Notification](ctx, schemaEventSource) {
state := extractIngressRoutingEntries(schemaEventSource.CanonicalView())
out.Store(state)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func Start(
return errors.WithStack(err)
}

for event := range channels.IterContext(ctx, svc.eventSource.Subscribe(ctx)) {
for event := range channels.IterSubscribable[schema.Notification](ctx, svc.eventSource) {
go func() {
switch e := event.(type) {
case *schema.ChangesetCreatedNotification:
Expand Down
3 changes: 1 addition & 2 deletions backend/schemamirror/schema_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (s *SchemaService) PullSchema(ctx context.Context, req *connect.Request[ftl
if !s.mirror.receiving.Load() {
return connect.NewError(connect.CodeUnavailable, errors.New("mirror is not receiving schema updates"))
}
updates := s.mirror.eventSource.Subscribe(ctx)
if err := stream.Send(&ftlv1.PullSchemaResponse{
Event: &schemapb.Notification{
Value: &schemapb.Notification_FullSchemaNotification{
Expand All @@ -67,7 +66,7 @@ func (s *SchemaService) PullSchema(ctx context.Context, req *connect.Request[ftl
}); err != nil {
return errors.Wrap(err, "failed to send initial schema")
}
for event := range channels.IterContext(ctx, updates) {
for event := range channels.IterSubscribable[schema.Notification](ctx, s.mirror.eventSource) {
if err := stream.Send(&ftlv1.PullSchemaResponse{
Event: schema.NotificationToProto(event),
}); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions internal/channels/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package channels

import (
"context"
"iter"
)

// Subscribable represents an object that can be subscribed too until the context is cancelled
type Subscribable[T any] interface {
Subscribe(ctx context.Context) <-chan T
}

// IterSubscribable subscribes to the object and iterates over events until the context is cancelled
//
// Check ctx.Err() != nil to detect if the context was cancelled.
func IterSubscribable[T any](ctx context.Context, subscribable Subscribable[T]) iter.Seq[T] {
return IterContext(ctx, subscribable.Subscribe(ctx))
}
3 changes: 1 addition & 2 deletions internal/deploymentcontext/deployment_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type SecretsProvider func(ctx context.Context) (map[string][]byte, error)
type ConfigProvider func(ctx context.Context) (map[string][]byte, error)

type RouteProvider interface {
Subscribe() chan string
Unsubscribe(c chan string)
Subscribe(ctx context.Context) <-chan string
Route(module string) string
}

Expand Down
12 changes: 3 additions & 9 deletions internal/deploymentcontext/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,8 @@ func (r *routeTableRouting) Route(module string) string {
}

// Subscribe implements RouteProvider.
func (r *routeTableRouting) Subscribe() chan string {
return r.table.Subscribe()
}

// Unsubscribe implements RouteProvider.
func (r *routeTableRouting) Unsubscribe(c chan string) {
r.table.Unsubscribe(c)
func (r *routeTableRouting) Subscribe(ctx context.Context) <-chan string {
return r.table.Subscribe(ctx)
}

// NewProvider retrieves config, secrets and DSNs for a module.
Expand All @@ -76,7 +71,7 @@ func NewProvider(key key.Deployment, routeProvider RouteProvider, moduleSchema *

ret := make(chan DeploymentContext, 16)
logger := log.FromContext(ctx)
updates := routeProvider.Subscribe()
updates := routeProvider.Subscribe(ctx)
module := moduleSchema.Name

// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
Expand Down Expand Up @@ -111,7 +106,6 @@ func NewProvider(key key.Deployment, routeProvider RouteProvider, moduleSchema *
configs := map[string][]byte{}
secrets := map[string][]byte{}
go func() {
defer routeProvider.Unsubscribe(updates)

for {
h := sha.New()
Expand Down
15 changes: 8 additions & 7 deletions internal/deploymentcontext/template_routing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package deploymentcontext

import (
"context"
"os"
)

Expand Down Expand Up @@ -29,11 +30,11 @@ func (t *templateRouteTable) Route(module string) string {
}

// Subscribe implements deploymentcontext.RouteProvider.
func (t *templateRouteTable) Subscribe() chan string {
return make(chan string)
}

// Unsubscribe implements deploymentcontext.RouteProvider.
func (t *templateRouteTable) Unsubscribe(c chan string) {
close(c)
func (t *templateRouteTable) Subscribe(ctx context.Context) <-chan string {
ret := make(chan string)
go func() {
<-ctx.Done()
close(ret)
}()
return ret
}
16 changes: 10 additions & 6 deletions internal/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/block/ftl/internal/schema/schemaeventsource"
)

var _ channels.Subscribable[string] = (*RouteTable)(nil)

type RouteView struct {
byDeployment map[string]*url.URL
moduleToDeployment map[string]key.Deployment
Expand All @@ -37,7 +39,7 @@ func New(ctx context.Context, changes *schemaeventsource.EventSource) *RouteTabl
}

func (r *RouteTable) run(ctx context.Context, changes *schemaeventsource.EventSource) {
for event := range channels.IterContext(ctx, changes.Subscribe(ctx)) {
for event := range channels.IterSubscribable[schema.Notification](ctx, changes) {
logger := log.FromContext(ctx)
logger.Tracef("Received schema event: %T", event)
old := r.routes.Load()
Expand Down Expand Up @@ -90,11 +92,13 @@ func (r RouteView) Schema() *schema.Schema {
return r.schema
}

func (r *RouteTable) Subscribe() chan string {
return r.changeNotification.Subscribe(nil)
}
func (r *RouteTable) Unsubscribe(s chan string) {
r.changeNotification.Unsubscribe(s)
func (r *RouteTable) Subscribe(ctx context.Context) <-chan string {
ret := r.changeNotification.Subscribe(nil)
go func() {
<-ctx.Done()
r.changeNotification.Unsubscribe(ret)
}()
return ret
}

func extractRoutes(ctx context.Context, sch *schema.Schema) RouteView {
Expand Down
4 changes: 1 addition & 3 deletions internal/routing/verb_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,9 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelin
moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](),
timelineClient: timelineClient,
}
routeUpdates := svc.routingTable.Subscribe()
logger := log.FromContext(ctx)
go func() {
defer svc.routingTable.Unsubscribe(routeUpdates)
for module := range channels.IterContext(ctx, routeUpdates) {
for module := range channels.IterSubscribable(ctx, routeTable) {
logger.Tracef("Removing client for module %s", module)
svc.moduleClients.Delete(module)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/schema/schemaeventsource/schemaeventsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"github.com/block/ftl/common/reflect"
"github.com/block/ftl/common/schema"
islices "github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/rpc"
)

var _ channels.Subscribable[schema.Notification] = (*EventSource)(nil)

type PullSchemaClient interface {
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error)
Expand Down
Loading