Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/confidential-module-plumbing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add ConfidentialModule and attributes plumbing for confidential CRE workflows #added #db_update #wip
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ type WorkflowSpec struct {
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Attributes []byte `db:"attributes"`
sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
9 changes: 6 additions & 3 deletions core/services/workflows/artifacts/v2/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url,
created_at,
updated_at,
spec_type
spec_type,
attributes
) VALUES (
:workflow,
:config,
Expand All @@ -71,7 +72,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
:config_url,
:created_at,
:updated_at,
:spec_type
:spec_type,
:attributes
) ON CONFLICT (workflow_id) DO UPDATE
SET
workflow = EXCLUDED.workflow,
Expand All @@ -84,7 +86,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url = EXCLUDED.config_url,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
attributes = EXCLUDED.attributes
RETURNING id
`

Expand Down
115 changes: 115 additions & 0 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
Attributes: payload.Attributes,
}

if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil {
Expand Down Expand Up @@ -697,6 +698,15 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return fmt.Errorf("invalid workflow name: %w", err)
}

confidential, err := v2.IsConfidential(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}
if confidential {
h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source)
}

// Create a channel to receive the initialization result.
// This allows us to wait for the engine to complete initialization (including trigger subscriptions)
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
Expand Down Expand Up @@ -768,6 +778,111 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return nil
}

// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule
// instead of a local WASM module. The ConfidentialModule delegates execution to
// the confidential-workflows capability which runs the WASM inside a TEE.
func (h *eventHandler) tryConfidentialEngineCreate(
ctx context.Context,
spec *job.WorkflowSpec,
wid types.WorkflowID,
workflowName types.WorkflowName,
decodedBinary []byte,
source string,
) error {
attrs, err := v2.ParseWorkflowAttributes(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}

binaryHash := v2.ComputeBinaryHash(decodedBinary)

lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule")
lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner)

module := v2.NewConfidentialModule(
h.capRegistry,
spec.BinaryURL,
binaryHash,
spec.WorkflowID,
spec.WorkflowOwner,
workflowName.String(),
spec.WorkflowTag,
attrs.VaultDonSecrets,
lggr,
)

initDone := make(chan error, 1)

cfg := &v2.EngineConfig{
Lggr: h.lggr,
Module: module,
WorkflowConfig: []byte(spec.Config),
CapRegistry: h.capRegistry,
DonSubscriber: h.workflowDonSubscriber,
UseLocalTimeProvider: h.useLocalTimeProvider,
DonTimeStore: h.donTimeStore,
ExecutionsStore: h.workflowStore,
WorkflowID: spec.WorkflowID,
WorkflowOwner: spec.WorkflowOwner,
WorkflowName: workflowName,
WorkflowTag: spec.WorkflowTag,
WorkflowEncryptionKey: h.workflowEncryptionKey,

LocalLimits: v2.EngineLimits{},
LocalLimiters: h.engineLimiters,
GlobalExecutionConcurrencyLimiter: h.workflowLimits,

BeholderEmitter: h.emitter,
BillingClient: h.billingClient,

WorkflowRegistryAddress: h.workflowRegistryAddress,
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
OrgResolver: h.orgResolver,
SecretsFetcher: h.secretsFetcher,
}

existingHook := cfg.Hooks.OnInitialized
cfg.Hooks.OnInitialized = func(err error) {
initDone <- err
if existingHook != nil {
existingHook(err)
}
}

engine, err := v2.NewEngine(cfg)
if err != nil {
return fmt.Errorf("failed to create confidential workflow engine: %w", err)
}

if err = engine.Start(ctx); err != nil {
return fmt.Errorf("failed to start confidential workflow engine: %w", err)
}

select {
case <-ctx.Done():
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err())
case initErr := <-initDone:
if initErr != nil {
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("engine initialization failed: %w", initErr)
}
}

if err := h.engineRegistry.Add(wid, source, engine); err != nil {
if closeErr := engine.Close(); closeErr != nil {
return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err)
}
return fmt.Errorf("invariant violation: %w", err)
}

return nil
}
Comment on lines +781 to +884
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tryConfidentialEngineCreate function lacks test coverage. The codebase shows comprehensive testing for tryEngineCreate and other handler flows. Consider adding tests that verify: (1) confidential engine creation when IsConfidential returns true, (2) proper initialization and lifecycle hooks, (3) error handling during engine creation and startup, and (4) integration with the engine registry.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added three handler-level tests in Test_workflowRegisteredHandler_confidentialRouting: (1) confidential attributes bypass the engine factory and route to the confidential path, (2) non-confidential attributes use the engine factory, (3) malformed attributes return a parse error.


// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
Expand Down
Loading
Loading