Add ConfidentialModule for confidential workflow execution#21298
Add ConfidentialModule for confidential workflow execution#21298
Conversation
…lows - DB migration 0291: add attributes bytea column to workflow_specs_v2 - WorkflowSpec: add Attributes field, persist through ORM - Handler: store payload.Attributes, route confidential workflows to dedicated engine creation path (tryConfidentialEngineCreate) - ConfidentialModule: host.ModuleV2 impl that delegates execution to the confidential-workflows@1.0.0-alpha capability via CapabilitiesRegistry - Plugin registration for confidential-workflows in plugins.private.yaml
|
👋 nadahalli, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
|
✅ No conflicts with other open PRs targeting |
There was a problem hiding this comment.
Pull request overview
This PR adds support for confidential workflow execution by introducing a new ConfidentialModule that delegates WASM execution to a Trusted Execution Environment (TEE) via the confidential-workflows@1.0.0-alpha capability. The implementation extends the existing workflow infrastructure to support workflows marked as confidential in their attributes.
Changes:
- Added database support for workflow attributes via new
attributesbytea column - Implemented
ConfidentialModuleto delegate execution to confidential compute capability - Added routing logic to direct confidential workflows to TEE execution path
- Registered
confidential-workflowsLOOP plugin for confidential compute integration
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| core/store/migrate/migrations/0291_add_workflow_attributes_column.sql | Database migration adding attributes bytea column to workflow_specs_v2 table |
| core/services/job/models.go | Added Attributes []byte field to WorkflowSpec struct |
| core/services/workflows/artifacts/v2/orm.go | Updated ORM to persist and retrieve the new attributes column |
| core/services/workflows/v2/confidential_module.go | New module implementing host.ModuleV2 for confidential workflow execution via capability delegation |
| core/services/workflows/syncer/v2/handler.go | Added routing logic and tryConfidentialEngineCreate to handle confidential workflows |
| plugins/plugins.private.yaml | Registered confidential-workflows LOOP plugin |
| go.mod | Updated chainlink-protos/cre/go dependency and added local replace directive for chainlink-common |
| go.sum | Updated checksums for dependency changes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // IsConfidential returns true if the Attributes JSON has "confidential": true. | ||
| func IsConfidential(data []byte) bool { | ||
| attrs, err := ParseWorkflowAttributes(data) | ||
| if err != nil { | ||
| return false | ||
| } | ||
| return attrs.Confidential | ||
| } |
There was a problem hiding this comment.
The IsConfidential function silently returns false when ParseWorkflowAttributes fails. This could hide configuration errors and cause workflows intended to be confidential to execute non-confidentially. Consider logging the parsing error or exposing it to callers so that malformed attributes are caught early rather than silently ignored.
There was a problem hiding this comment.
Fixed. IsConfidential now returns (bool, error) so callers fail loudly on malformed attributes instead of silently falling through to non-confidential execution.
| @@ -0,0 +1,5 @@ | |||
| -- +goose Up | |||
| ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; | |||
There was a problem hiding this comment.
The default value for the attributes column is set to an empty string (''). For a bytea column, this creates a zero-length byte array rather than NULL. Consider whether NULL would be more appropriate for workflows without attributes, which would be consistent with how empty/missing data is typically represented in SQL. If an empty byte array is intentional, this is fine, but ensure that ParseWorkflowAttributes handles both nil and empty byte arrays correctly (which it does on line 39).
| ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; | |
| ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea; |
There was a problem hiding this comment.
Leaving as-is. ParseWorkflowAttributes handles both nil and empty byte arrays identically via len(data) == 0, so the behavior is correct either way. Using an empty default keeps existing rows consistent without NULL semantics.
|
|
||
| protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) | ||
| for i, s := range m.vaultDonSecrets { | ||
| ns := s.Namespace |
There was a problem hiding this comment.
The code defaults an empty namespace to "main" on line 116. This default appears to be a business logic decision, but it's not documented. Consider adding a comment explaining why "main" is the default namespace and whether this aligns with the VaultDON's expected behavior. This will help future maintainers understand the implicit contract.
| ns := s.Namespace | |
| ns := s.Namespace | |
| // Default to the "main" namespace when none is provided. VaultDON and the | |
| // confidential workflows capability treat "main" as the canonical default | |
| // namespace for secrets, so leaving this empty would not match the expected | |
| // behavior of downstream components. |
There was a problem hiding this comment.
Added comment: "VaultDON treats 'main' as the default namespace for secrets."
| // tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule | ||
| // instead of a local WASM module. The ConfidentialModule delegates execution to | ||
| // the confidential-workflows capability which runs the WASM inside a TEE. | ||
| func (h *eventHandler) tryConfidentialEngineCreate( | ||
| ctx context.Context, | ||
| spec *job.WorkflowSpec, | ||
| wid types.WorkflowID, | ||
| workflowName types.WorkflowName, | ||
| decodedBinary []byte, | ||
| source string, | ||
| ) error { | ||
| attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to parse workflow attributes: %w", err) | ||
| } | ||
|
|
||
| binaryHash := v2.ComputeBinaryHash(decodedBinary) | ||
|
|
||
| lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") | ||
| lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) | ||
|
|
||
| module := v2.NewConfidentialModule( | ||
| h.capRegistry, | ||
| spec.BinaryURL, | ||
| binaryHash, | ||
| spec.WorkflowID, | ||
| spec.WorkflowOwner, | ||
| workflowName.String(), | ||
| spec.WorkflowTag, | ||
| attrs.VaultDonSecrets, | ||
| lggr, | ||
| ) | ||
|
|
||
| initDone := make(chan error, 1) | ||
|
|
||
| cfg := &v2.EngineConfig{ | ||
| Lggr: h.lggr, | ||
| Module: module, | ||
| WorkflowConfig: []byte(spec.Config), | ||
| CapRegistry: h.capRegistry, | ||
| DonSubscriber: h.workflowDonSubscriber, | ||
| UseLocalTimeProvider: h.useLocalTimeProvider, | ||
| DonTimeStore: h.donTimeStore, | ||
| ExecutionsStore: h.workflowStore, | ||
| WorkflowID: spec.WorkflowID, | ||
| WorkflowOwner: spec.WorkflowOwner, | ||
| WorkflowName: workflowName, | ||
| WorkflowTag: spec.WorkflowTag, | ||
| WorkflowEncryptionKey: h.workflowEncryptionKey, | ||
|
|
||
| LocalLimits: v2.EngineLimits{}, | ||
| LocalLimiters: h.engineLimiters, | ||
| GlobalExecutionConcurrencyLimiter: h.workflowLimits, | ||
|
|
||
| BeholderEmitter: h.emitter, | ||
| BillingClient: h.billingClient, | ||
|
|
||
| WorkflowRegistryAddress: h.workflowRegistryAddress, | ||
| WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, | ||
| OrgResolver: h.orgResolver, | ||
| SecretsFetcher: h.secretsFetcher, | ||
| } | ||
|
|
||
| existingHook := cfg.Hooks.OnInitialized | ||
| cfg.Hooks.OnInitialized = func(err error) { | ||
| initDone <- err | ||
| if existingHook != nil { | ||
| existingHook(err) | ||
| } | ||
| } | ||
|
|
||
| engine, err := v2.NewEngine(cfg) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create confidential workflow engine: %w", err) | ||
| } | ||
|
|
||
| if err = engine.Start(ctx); err != nil { | ||
| return fmt.Errorf("failed to start confidential workflow engine: %w", err) | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| if closeErr := engine.Close(); closeErr != nil { | ||
| h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID) | ||
| } | ||
| return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err()) | ||
| case initErr := <-initDone: | ||
| if initErr != nil { | ||
| if closeErr := engine.Close(); closeErr != nil { | ||
| h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID) | ||
| } | ||
| return fmt.Errorf("engine initialization failed: %w", initErr) | ||
| } | ||
| } | ||
|
|
||
| if err := h.engineRegistry.Add(wid, source, engine); err != nil { | ||
| if closeErr := engine.Close(); closeErr != nil { | ||
| return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err) | ||
| } | ||
| return fmt.Errorf("invariant violation: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
The tryConfidentialEngineCreate function lacks test coverage. The codebase shows comprehensive testing for tryEngineCreate and other handler flows. Consider adding tests that verify: (1) confidential engine creation when IsConfidential returns true, (2) proper initialization and lifecycle hooks, (3) error handling during engine creation and startup, and (4) integration with the engine registry.
There was a problem hiding this comment.
Added three handler-level tests in Test_workflowRegisteredHandler_confidentialRouting: (1) confidential attributes bypass the engine factory and route to the confidential path, (2) non-confidential attributes use the engine factory, (3) malformed attributes return a parse error.
| package v2 | ||
|
|
||
| import ( | ||
| "context" | ||
| "crypto/sha256" | ||
| "encoding/json" | ||
| "fmt" | ||
|
|
||
| "google.golang.org/protobuf/proto" | ||
| "google.golang.org/protobuf/types/known/anypb" | ||
|
|
||
| "github.com/smartcontractkit/chainlink-common/pkg/capabilities" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/types/core" | ||
| "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
|
|
||
| confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" | ||
| sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" | ||
| ) | ||
|
|
||
| const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha" | ||
|
|
||
| // WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes. | ||
| type WorkflowAttributes struct { | ||
| Confidential bool `json:"confidential"` | ||
| VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"` | ||
| } | ||
|
|
||
| // SecretIdentifier identifies a secret in VaultDON. | ||
| type SecretIdentifier struct { | ||
| Key string `json:"key"` | ||
| Namespace string `json:"namespace,omitempty"` | ||
| } | ||
|
|
||
| // ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec. | ||
| // Returns a zero-value struct if data is nil or empty. | ||
| func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) { | ||
| var attrs WorkflowAttributes | ||
| if len(data) == 0 { | ||
| return attrs, nil | ||
| } | ||
| if err := json.Unmarshal(data, &attrs); err != nil { | ||
| return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err) | ||
| } | ||
| return attrs, nil | ||
| } | ||
|
|
||
| // IsConfidential returns true if the Attributes JSON has "confidential": true. | ||
| func IsConfidential(data []byte) bool { | ||
| attrs, err := ParseWorkflowAttributes(data) | ||
| if err != nil { | ||
| return false | ||
| } | ||
| return attrs.Confidential | ||
| } | ||
|
|
||
| // ConfidentialModule implements host.ModuleV2 for confidential workflows. | ||
| // Instead of running WASM locally, it delegates execution to the | ||
| // confidential-workflows capability via the CapabilitiesRegistry. | ||
| type ConfidentialModule struct { | ||
| capRegistry core.CapabilitiesRegistry | ||
| binaryURL string | ||
| binaryHash []byte | ||
| workflowID string | ||
| workflowOwner string | ||
| workflowName string | ||
| workflowTag string | ||
| vaultDonSecrets []SecretIdentifier | ||
| lggr logger.Logger | ||
| } | ||
|
|
||
| var _ host.ModuleV2 = (*ConfidentialModule)(nil) | ||
|
|
||
| func NewConfidentialModule( | ||
| capRegistry core.CapabilitiesRegistry, | ||
| binaryURL string, | ||
| binaryHash []byte, | ||
| workflowID string, | ||
| workflowOwner string, | ||
| workflowName string, | ||
| workflowTag string, | ||
| vaultDonSecrets []SecretIdentifier, | ||
| lggr logger.Logger, | ||
| ) *ConfidentialModule { | ||
| return &ConfidentialModule{ | ||
| capRegistry: capRegistry, | ||
| binaryURL: binaryURL, | ||
| binaryHash: binaryHash, | ||
| workflowID: workflowID, | ||
| workflowOwner: workflowOwner, | ||
| workflowName: workflowName, | ||
| workflowTag: workflowTag, | ||
| vaultDonSecrets: vaultDonSecrets, | ||
| lggr: lggr, | ||
| } | ||
| } | ||
|
|
||
| func (m *ConfidentialModule) Start() {} | ||
| func (m *ConfidentialModule) Close() {} | ||
| func (m *ConfidentialModule) IsLegacyDAG() bool { return false } | ||
|
|
||
| func (m *ConfidentialModule) Execute( | ||
| ctx context.Context, | ||
| request *sdkpb.ExecuteRequest, | ||
| _ host.ExecutionHelper, | ||
| ) (*sdkpb.ExecutionResult, error) { | ||
| execReqBytes, err := proto.Marshal(request) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err) | ||
| } | ||
|
|
||
| protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) | ||
| for i, s := range m.vaultDonSecrets { | ||
| ns := s.Namespace | ||
| if ns == "" { | ||
| ns = "main" | ||
| } | ||
| protoSecrets[i] = &confworkflowtypes.SecretIdentifier{ | ||
| Key: s.Key, | ||
| Namespace: proto.String(ns), | ||
| } | ||
| } | ||
|
|
||
| capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ | ||
| VaultDonSecrets: protoSecrets, | ||
| Execution: &confworkflowtypes.WorkflowExecution{ | ||
| WorkflowId: m.workflowID, | ||
| BinaryUrl: m.binaryURL, | ||
| BinaryHash: m.binaryHash, | ||
| ExecuteRequest: execReqBytes, | ||
| }, | ||
| } | ||
|
|
||
| payload, err := anypb.New(capInput) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to marshal capability payload: %w", err) | ||
| } | ||
|
|
||
| cap, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err) | ||
| } | ||
|
|
||
| capReq := capabilities.CapabilityRequest{ | ||
| Payload: payload, | ||
| Method: "Execute", | ||
| CapabilityId: confidentialWorkflowsCapabilityID, | ||
| Metadata: capabilities.RequestMetadata{ | ||
| WorkflowID: m.workflowID, | ||
| WorkflowOwner: m.workflowOwner, | ||
| WorkflowName: m.workflowName, | ||
| WorkflowTag: m.workflowTag, | ||
| }, | ||
| } | ||
|
|
||
| capResp, err := cap.Execute(ctx, capReq) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err) | ||
| } | ||
|
|
||
| if capResp.Payload == nil { | ||
| return nil, fmt.Errorf("confidential-workflows capability returned nil payload") | ||
| } | ||
|
|
||
| var confResp confworkflowtypes.ConfidentialWorkflowResponse | ||
| if err := capResp.Payload.UnmarshalTo(&confResp); err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) | ||
| } | ||
|
|
||
| var result sdkpb.ExecutionResult | ||
| if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err) | ||
| } | ||
|
|
||
| return &result, nil | ||
| } | ||
|
|
||
| // ComputeBinaryHash returns the SHA-256 hash of the given binary. | ||
| func ComputeBinaryHash(binary []byte) []byte { | ||
| h := sha256.Sum256(binary) | ||
| return h[:] | ||
| } |
There was a problem hiding this comment.
The new ConfidentialModule and tryConfidentialEngineCreate function lack test coverage. The codebase shows comprehensive testing for other modules and engine creation flows. Consider adding tests that verify: (1) confidential module creation and configuration, (2) routing logic for workflows with confidential attributes, (3) error handling when the capability is not available, and (4) proper passing of vault secrets and binary hash.
There was a problem hiding this comment.
Added confidential_module_test.go with tests for ParseWorkflowAttributes, IsConfidential, ComputeBinaryHash, and ConfidentialModule.Execute (success, error paths, namespace defaulting, request field forwarding, nil payload).
| return fmt.Errorf("invalid workflow name: %w", err) | ||
| } | ||
|
|
||
| if v2.IsConfidential(spec.Attributes) { |
There was a problem hiding this comment.
Consider adding a log statement when routing to the confidential workflow path (after line 701). This would help with debugging and observability by making it clear when a workflow is being executed confidentially. For example: h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
| if v2.IsConfidential(spec.Attributes) { | |
| if v2.IsConfidential(spec.Attributes) { | |
| h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) |
There was a problem hiding this comment.
Added: h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
- IsConfidential now returns (bool, error) instead of silently swallowing malformed attributes JSON - Add info log when routing workflow to confidential execution - Add unit tests for ParseWorkflowAttributes, IsConfidential, ComputeBinaryHash, ConfidentialModule.Execute (success, error paths, namespace defaulting, request field forwarding)
- Add handler-level tests verifying confidential routing: confidential attributes bypass engine factory, non-confidential uses it, malformed attributes return error - Add comment documenting "main" as VaultDON default namespace
Replace the local ../chainlink-common replace directive with a proper module reference to commit 177ddc60abbe on the tejaswi/confidential-workflows-codegen branch. Also fix Namespace field assignment (string, not *string) to match the published proto.
|
|
Implementation plan: Confidential CRE Workflows (full PR chain and cross-repo dependencies) |




Summary
When a workflow is deployed with
"confidential": truein its Attributes, the engine uses aConfidentialModule(a newhost.ModuleV2impl) instead of the WASM host module. The ConfidentialModule delegates both subscribe and trigger execution to theconfidential-workflows@1.0.0-alphacapability, which runs as a LOOP plugin.Changes
attributes byteacolumn toworkflow_specs_v2AttributesfromWorkflowRegisteredEventthrough to the database and engine creationConfidentialModule:host.ModuleV2impl that serializes theExecuteRequest, builds aConfidentialWorkflowRequest, and calls the capability viaCapabilitiesRegistrytryEngineCreate()checksIsConfidential(spec.Attributes)and branches totryConfidentialEngineCreate(), which builds a V2 engine with theConfidentialModuleinstead of a WASM moduleconfidential-workflowstoplugins.private.yamlWhat this does NOT change
engine.go): the engine callsModule.Execute()the same way regardless of module typeEngineConfig: no new fields; theModulefield already accepts anyhost.ModuleV2ExecutionHelper/SecretsFetcher: ConfidentialModule ignores both; the enclave has its ownAttributes JSON format
Set by the CLI at deploy time:
{ "confidential": true, "vault_don_secrets": [ {"key": "API_KEY"}, {"key": "SIGNING_KEY", "namespace": "custom-ns"} ] }Design decision: Binary hash
SHA-256 is computed from the binary in WorkflowSpec, not from Attributes. The enclave re-verifies against the fetched binary, so this is defense-in-depth.
Links
confidential-compute/enclave/apps/confidential-workflows/capability