diff --git a/.changeset/confidential-module-plumbing.md b/.changeset/confidential-module-plumbing.md new file mode 100644 index 00000000000..8079b1a06d5 --- /dev/null +++ b/.changeset/confidential-module-plumbing.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add ConfidentialModule and attributes plumbing for confidential CRE workflows #added #db_update #wip diff --git a/core/services/job/models.go b/core/services/job/models.go index 20e7da2c643..2d257a33ca5 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -930,6 +930,7 @@ type WorkflowSpec struct { CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + Attributes []byte `db:"attributes"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/workflows/artifacts/v2/orm.go b/core/services/workflows/artifacts/v2/orm.go index 543dab5da83..cc213898678 100644 --- a/core/services/workflows/artifacts/v2/orm.go +++ b/core/services/workflows/artifacts/v2/orm.go @@ -58,7 +58,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url, created_at, updated_at, - spec_type + spec_type, + attributes ) VALUES ( :workflow, :config, @@ -71,7 +72,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) :config_url, :created_at, :updated_at, - :spec_type + :spec_type, + :attributes ) ON CONFLICT (workflow_id) DO UPDATE SET workflow = EXCLUDED.workflow, @@ -84,7 +86,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) config_url = EXCLUDED.config_url, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type + spec_type = EXCLUDED.spec_type, + attributes = EXCLUDED.attributes RETURNING id ` diff --git a/core/services/workflows/syncer/v2/handler.go b/core/services/workflows/syncer/v2/handler.go index 979f3b7a861..9e47dbaf332 100644 --- a/core/services/workflows/syncer/v2/handler.go +++ b/core/services/workflows/syncer/v2/handler.go @@ -473,6 +473,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR SpecType: job.WASMFile, BinaryURL: payload.BinaryURL, ConfigURL: payload.ConfigURL, + Attributes: payload.Attributes, } if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil { @@ -697,6 +698,15 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return fmt.Errorf("invalid workflow name: %w", err) } + confidential, err := v2.IsConfidential(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + if confidential { + h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID) + return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source) + } + // Create a channel to receive the initialization result. // This allows us to wait for the engine to complete initialization (including trigger subscriptions) // before emitting the workflowActivated event, ensuring the event accurately reflects deployment status. @@ -768,6 +778,111 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp return nil } +// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule +// instead of a local WASM module. The ConfidentialModule delegates execution to +// the confidential-workflows capability which runs the WASM inside a TEE. +func (h *eventHandler) tryConfidentialEngineCreate( + ctx context.Context, + spec *job.WorkflowSpec, + wid types.WorkflowID, + workflowName types.WorkflowName, + decodedBinary []byte, + source string, +) error { + attrs, err := v2.ParseWorkflowAttributes(spec.Attributes) + if err != nil { + return fmt.Errorf("failed to parse workflow attributes: %w", err) + } + + binaryHash := v2.ComputeBinaryHash(decodedBinary) + + lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule") + lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner) + + module := v2.NewConfidentialModule( + h.capRegistry, + spec.BinaryURL, + binaryHash, + spec.WorkflowID, + spec.WorkflowOwner, + workflowName.String(), + spec.WorkflowTag, + attrs.VaultDonSecrets, + lggr, + ) + + initDone := make(chan error, 1) + + cfg := &v2.EngineConfig{ + Lggr: h.lggr, + Module: module, + WorkflowConfig: []byte(spec.Config), + CapRegistry: h.capRegistry, + DonSubscriber: h.workflowDonSubscriber, + UseLocalTimeProvider: h.useLocalTimeProvider, + DonTimeStore: h.donTimeStore, + ExecutionsStore: h.workflowStore, + WorkflowID: spec.WorkflowID, + WorkflowOwner: spec.WorkflowOwner, + WorkflowName: workflowName, + WorkflowTag: spec.WorkflowTag, + WorkflowEncryptionKey: h.workflowEncryptionKey, + + LocalLimits: v2.EngineLimits{}, + LocalLimiters: h.engineLimiters, + GlobalExecutionConcurrencyLimiter: h.workflowLimits, + + BeholderEmitter: h.emitter, + BillingClient: h.billingClient, + + WorkflowRegistryAddress: h.workflowRegistryAddress, + WorkflowRegistryChainSelector: h.workflowRegistryChainSelector, + OrgResolver: h.orgResolver, + SecretsFetcher: h.secretsFetcher, + } + + existingHook := cfg.Hooks.OnInitialized + cfg.Hooks.OnInitialized = func(err error) { + initDone <- err + if existingHook != nil { + existingHook(err) + } + } + + engine, err := v2.NewEngine(cfg) + if err != nil { + return fmt.Errorf("failed to create confidential workflow engine: %w", err) + } + + if err = engine.Start(ctx); err != nil { + return fmt.Errorf("failed to start confidential workflow engine: %w", err) + } + + select { + case <-ctx.Done(): + if closeErr := engine.Close(); closeErr != nil { + h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID) + } + return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err()) + case initErr := <-initDone: + if initErr != nil { + if closeErr := engine.Close(); closeErr != nil { + h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID) + } + return fmt.Errorf("engine initialization failed: %w", initErr) + } + } + + if err := h.engineRegistry.Add(wid, source, engine); err != nil { + if closeErr := engine.Close(); closeErr != nil { + return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err) + } + return fmt.Errorf("invariant violation: %w", err) + } + + return nil +} + // logCustMsg emits a custom message to the external sink and logs an error if that fails. func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { err := cma.Emit(ctx, msg) diff --git a/core/services/workflows/syncer/v2/handler_test.go b/core/services/workflows/syncer/v2/handler_test.go index 53441414ca3..320b57e586b 100644 --- a/core/services/workflows/syncer/v2/handler_test.go +++ b/core/services/workflows/syncer/v2/handler_test.go @@ -614,6 +614,264 @@ func Test_workflowRegisteredHandler(t *testing.T) { } } +func Test_workflowRegisteredHandler_confidentialRouting(t *testing.T) { + t.Run("confidential workflow bypasses engine factory and routes to confidential path", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + // Track whether the engine factory is called. The confidential path + // should bypass it entirely. + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"}]}`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + // The confidential path creates a real v2.Engine. With test data + // (non-hex owner), engine creation fails. The error comes from the + // confidential path, proving routing worked correctly. + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to create confidential workflow engine") + + // The engine factory must NOT have been called; the confidential path + // bypasses it. + assert.False(t, factoryCalled, "engine factory should not be called for confidential workflows") + + // The engine should NOT be in the registry since init failed. + _, ok := er.Get(giveWFID) + assert.False(t, ok, "engine should not be registered after failed init") + }) + + t.Run("non-confidential workflow uses engine factory", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + + factoryCalled := false + trackingFactory := func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) { + factoryCalled = true + if initDone != nil { + initDone <- nil + } + return &mockEngine{}, nil + } + + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(trackingFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + // No Attributes, or non-confidential attributes. + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + require.NoError(t, err) + + assert.True(t, factoryCalled, "engine factory should be called for non-confidential workflows") + + engine, ok := er.Get(giveWFID) + require.True(t, ok, "engine should be registered") + require.NoError(t, engine.Ready()) + }) + + t.Run("malformed attributes returns error", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + lf = limits.Factory{Logger: lggr} + db = pgtest.NewSqlxDB(t) + orm = artifacts.NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + wfOwner = []byte("0xOwner") + workflowEncryptionKey = workflowkey.MustNewXXXTestingOnly(big.NewInt(1)) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, "") + require.NoError(t, err) + wfIDString := hex.EncodeToString(giveWFID[:]) + + binaryURL := "http://example.com/" + wfIDString + "/binary" + configURL := "http://example.com/" + wfIDString + "/config" + signedURLParameter := "?auth=abc123" + signedBinaryURL := binaryURL + signedURLParameter + signedConfigURL := configURL + signedURLParameter + + fetcher := newMockFetcher(map[string]mockFetchResp{ + wfIDString + "-ARTIFACT_TYPE_BINARY": {Body: []byte(signedBinaryURL), Err: nil}, + wfIDString + "-ARTIFACT_TYPE_CONFIG": {Body: []byte(signedConfigURL), Err: nil}, + signedBinaryURL: {Body: encodedBinary, Err: nil}, + signedConfigURL: {Body: config, Err: nil}, + }) + artifactStore, err := artifacts.NewStore(lggr, orm, fetcher.FetcherFunc(), fetcher.RetrieverFunc(), clockwork.NewFakeClock(), workflowkey.Key{}, custmsg.NewLabeler(), lf, artifacts.WithConfig(artifacts.StoreConfig{ + ArtifactStorageHost: "example.com", + })) + require.NoError(t, err) + + er := NewEngineRegistry() + wfStore := store.NewInMemoryStore(lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + limiters, err := v2.NewLimiters(lf, nil) + require.NoError(t, err) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(lggr, syncerlimiter.Config{Global: 200, PerOwner: 200}, lf) + require.NoError(t, err) + + h, err := NewEventHandler(lggr, wfStore, nil, true, registry, er, emitter, limiters, rl, workflowLimits, artifactStore, workflowEncryptionKey, &testDonNotifier{}, + WithEngineRegistry(er), + WithEngineFactoryFn(mockEngineFactory), + ) + require.NoError(t, err) + servicetest.Run(t, h) + + event := WorkflowRegisteredEvent{ + Status: WorkflowStatusActive, + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + WorkflowTag: "workflow-tag", + BinaryURL: binaryURL, + ConfigURL: configURL, + Attributes: []byte(`{not valid json`), + } + + ctx = contexts.WithCRE(ctx, contexts.CRE{Owner: hex.EncodeToString(wfOwner), Workflow: wfIDString}) + err = h.workflowRegisteredEvent(ctx, event) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + type testCase struct { Name string BinaryURLFactory func(string) string diff --git a/core/services/workflows/v2/confidential_module.go b/core/services/workflows/v2/confidential_module.go new file mode 100644 index 00000000000..0f8e839a969 --- /dev/null +++ b/core/services/workflows/v2/confidential_module.go @@ -0,0 +1,185 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" +) + +const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha" + +// WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes. +type WorkflowAttributes struct { + Confidential bool `json:"confidential"` + VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"` +} + +// SecretIdentifier identifies a secret in VaultDON. +type SecretIdentifier struct { + Key string `json:"key"` + Namespace string `json:"namespace,omitempty"` +} + +// ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec. +// Returns a zero-value struct if data is nil or empty. +func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) { + var attrs WorkflowAttributes + if len(data) == 0 { + return attrs, nil + } + if err := json.Unmarshal(data, &attrs); err != nil { + return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err) + } + return attrs, nil +} + +// IsConfidential returns true if the Attributes JSON has "confidential": true. +// Returns an error if the attributes contain malformed JSON, so callers can +// fail loudly rather than silently falling through to non-confidential execution. +func IsConfidential(data []byte) (bool, error) { + attrs, err := ParseWorkflowAttributes(data) + if err != nil { + return false, err + } + return attrs.Confidential, nil +} + +// ConfidentialModule implements host.ModuleV2 for confidential workflows. +// Instead of running WASM locally, it delegates execution to the +// confidential-workflows capability via the CapabilitiesRegistry. +type ConfidentialModule struct { + capRegistry core.CapabilitiesRegistry + binaryURL string + binaryHash []byte + workflowID string + workflowOwner string + workflowName string + workflowTag string + vaultDonSecrets []SecretIdentifier + lggr logger.Logger +} + +var _ host.ModuleV2 = (*ConfidentialModule)(nil) + +func NewConfidentialModule( + capRegistry core.CapabilitiesRegistry, + binaryURL string, + binaryHash []byte, + workflowID string, + workflowOwner string, + workflowName string, + workflowTag string, + vaultDonSecrets []SecretIdentifier, + lggr logger.Logger, +) *ConfidentialModule { + return &ConfidentialModule{ + capRegistry: capRegistry, + binaryURL: binaryURL, + binaryHash: binaryHash, + workflowID: workflowID, + workflowOwner: workflowOwner, + workflowName: workflowName, + workflowTag: workflowTag, + vaultDonSecrets: vaultDonSecrets, + lggr: lggr, + } +} + +func (m *ConfidentialModule) Start() {} +func (m *ConfidentialModule) Close() {} +func (m *ConfidentialModule) IsLegacyDAG() bool { return false } + +func (m *ConfidentialModule) Execute( + ctx context.Context, + request *sdkpb.ExecuteRequest, + _ host.ExecutionHelper, +) (*sdkpb.ExecutionResult, error) { + execReqBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err) + } + + protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets)) + for i, s := range m.vaultDonSecrets { + // VaultDON treats "main" as the default namespace for secrets. + ns := s.Namespace + if ns == "" { + ns = "main" + } + protoSecrets[i] = &confworkflowtypes.SecretIdentifier{ + Key: s.Key, + Namespace: ns, + } + } + + capInput := &confworkflowtypes.ConfidentialWorkflowRequest{ + VaultDonSecrets: protoSecrets, + Execution: &confworkflowtypes.WorkflowExecution{ + WorkflowId: m.workflowID, + BinaryUrl: m.binaryURL, + BinaryHash: m.binaryHash, + ExecuteRequest: execReqBytes, + }, + } + + payload, err := anypb.New(capInput) + if err != nil { + return nil, fmt.Errorf("failed to marshal capability payload: %w", err) + } + + cap, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID) + if err != nil { + return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err) + } + + capReq := capabilities.CapabilityRequest{ + Payload: payload, + Method: "Execute", + CapabilityId: confidentialWorkflowsCapabilityID, + Metadata: capabilities.RequestMetadata{ + WorkflowID: m.workflowID, + WorkflowOwner: m.workflowOwner, + WorkflowName: m.workflowName, + WorkflowTag: m.workflowTag, + }, + } + + capResp, err := cap.Execute(ctx, capReq) + if err != nil { + return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err) + } + + if capResp.Payload == nil { + return nil, fmt.Errorf("confidential-workflows capability returned nil payload") + } + + var confResp confworkflowtypes.ConfidentialWorkflowResponse + if err := capResp.Payload.UnmarshalTo(&confResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal capability response: %w", err) + } + + var result sdkpb.ExecutionResult + if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err) + } + + return &result, nil +} + +// ComputeBinaryHash returns the SHA-256 hash of the given binary. +func ComputeBinaryHash(binary []byte) []byte { + h := sha256.Sum256(binary) + return h[:] +} diff --git a/core/services/workflows/v2/confidential_module_test.go b/core/services/workflows/v2/confidential_module_test.go new file mode 100644 index 00000000000..95407f9efa2 --- /dev/null +++ b/core/services/workflows/v2/confidential_module_test.go @@ -0,0 +1,321 @@ +package v2 + +import ( + "context" + "crypto/sha256" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + regmocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + + confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow" + capmocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + valuespb "github.com/smartcontractkit/chainlink-protos/cre/go/values/pb" +) + +func TestParseWorkflowAttributes(t *testing.T) { + t.Run("valid JSON with all fields", func(t *testing.T) { + data := []byte(`{"confidential":true,"vault_don_secrets":[{"key":"API_KEY"},{"key":"SIGNING_KEY","namespace":"custom-ns"}]}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.True(t, attrs.Confidential) + require.Len(t, attrs.VaultDonSecrets, 2) + assert.Equal(t, "API_KEY", attrs.VaultDonSecrets[0].Key) + assert.Equal(t, "", attrs.VaultDonSecrets[0].Namespace) + assert.Equal(t, "SIGNING_KEY", attrs.VaultDonSecrets[1].Key) + assert.Equal(t, "custom-ns", attrs.VaultDonSecrets[1].Namespace) + }) + + t.Run("empty data returns zero value", func(t *testing.T) { + attrs, err := ParseWorkflowAttributes(nil) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + assert.Nil(t, attrs.VaultDonSecrets) + + attrs, err = ParseWorkflowAttributes([]byte{}) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("non-confidential workflow", func(t *testing.T) { + data := []byte(`{"confidential":false}`) + attrs, err := ParseWorkflowAttributes(data) + require.NoError(t, err) + assert.False(t, attrs.Confidential) + }) + + t.Run("malformed JSON returns error", func(t *testing.T) { + _, err := ParseWorkflowAttributes([]byte(`{not json}`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestIsConfidential(t *testing.T) { + t.Run("returns true for confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":true}`)) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("returns false for non-confidential", func(t *testing.T) { + ok, err := IsConfidential([]byte(`{"confidential":false}`)) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns false for empty data", func(t *testing.T) { + ok, err := IsConfidential(nil) + require.NoError(t, err) + assert.False(t, ok) + }) + + t.Run("returns error for malformed JSON", func(t *testing.T) { + _, err := IsConfidential([]byte(`broken`)) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse workflow attributes") + }) +} + +func TestComputeBinaryHash(t *testing.T) { + binary := []byte("hello world") + hash := ComputeBinaryHash(binary) + expected := sha256.Sum256(binary) + assert.Equal(t, expected[:], hash) + + // Deterministic: same input produces same hash. + assert.Equal(t, hash, ComputeBinaryHash(binary)) +} + +func TestConfidentialModule_Execute(t *testing.T) { + ctx := context.Background() + lggr := logger.Nop() + + // Build an ExecuteRequest to send through the module. + execReq := &sdkpb.ExecuteRequest{ + Config: []byte("test-config"), + } + + // Build the expected ExecutionResult that the enclave returns. + expectedResult := &sdkpb.ExecutionResult{ + Result: &sdkpb.ExecutionResult_Value{ + Value: valuespb.NewStringValue("enclave-output"), + }, + } + + // Serialize the result into a ConfidentialWorkflowResponse, as the capability would. + resultBytes, err := proto.Marshal(expectedResult) + require.NoError(t, err) + + confResp := &confworkflowtypes.ConfidentialWorkflowResponse{ + ExecutionResult: resultBytes, + } + respPayload, err := anypb.New(confResp) + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + execCap.EXPECT().Execute(matches.AnyContext, mock.MatchedBy(func(req capabilities.CapabilityRequest) bool { + return req.Method == "Execute" && + req.CapabilityId == confidentialWorkflowsCapabilityID && + req.Metadata.WorkflowID == "wf-123" && + req.Metadata.WorkflowOwner == "owner-abc" && + req.Payload != nil + })).Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("fakehash"), + "wf-123", + "owner-abc", + "my-workflow", + "v1", + []SecretIdentifier{ + {Key: "API_KEY"}, + {Key: "SIGNING_KEY", Namespace: "custom-ns"}, + }, + lggr, + ) + + result, err := mod.Execute(ctx, execReq, nil) + require.NoError(t, err) + require.NotNil(t, result) + + val := result.GetValue() + require.NotNil(t, val) + assert.Equal(t, "enclave-output", val.GetStringValue()) + }) + + t.Run("default namespace is main", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + // Capture the request to inspect proto secrets. + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + mod := NewConfidentialModule( + capReg, + "https://example.com/binary.wasm", + []byte("hash"), + "wf-1", "owner", "name", "tag", + []SecretIdentifier{{Key: "SECRET_A"}}, // no namespace + lggr, + ) + + _, err := mod.Execute(ctx, execReq, nil) + require.NoError(t, err) + + // Unmarshal the captured request payload and verify namespace defaulted to "main". + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + require.Len(t, confReq.VaultDonSecrets, 1) + assert.Equal(t, "SECRET_A", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[0].GetNamespace()) + }) + + t.Run("GetExecutable error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(nil, fmt.Errorf("capability not found")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get confidential-workflows capability") + }) + + t.Run("capability Execute error", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{}, fmt.Errorf("enclave unavailable")).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "confidential-workflows capability execution failed") + }) + + t.Run("nil payload in response", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Return(capabilities.CapabilityResponse{Payload: nil}, nil).Once() + + mod := NewConfidentialModule( + capReg, "", nil, "wf", "owner", "name", "tag", nil, lggr, + ) + + _, err := mod.Execute(ctx, execReq, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "returned nil payload") + }) + + t.Run("request fields are forwarded correctly", func(t *testing.T) { + capReg := regmocks.NewCapabilitiesRegistry(t) + execCap := capmocks.NewExecutableCapability(t) + + capReg.EXPECT().GetExecutable(matches.AnyContext, confidentialWorkflowsCapabilityID). + Return(execCap, nil).Once() + + var capturedReq capabilities.CapabilityRequest + execCap.EXPECT().Execute(matches.AnyContext, mock.Anything). + Run(func(_ context.Context, req capabilities.CapabilityRequest) { + capturedReq = req + }). + Return(capabilities.CapabilityResponse{Payload: respPayload}, nil).Once() + + binaryHash := ComputeBinaryHash([]byte("some-binary")) + mod := NewConfidentialModule( + capReg, + "https://example.com/wasm", + binaryHash, + "wf-abc", + "0xowner", + "my-workflow", + "v2", + []SecretIdentifier{ + {Key: "K1", Namespace: "ns1"}, + {Key: "K2"}, + }, + lggr, + ) + + _, err := mod.Execute(ctx, execReq, nil) + require.NoError(t, err) + + // Verify metadata. + assert.Equal(t, "Execute", capturedReq.Method) + assert.Equal(t, confidentialWorkflowsCapabilityID, capturedReq.CapabilityId) + assert.Equal(t, "wf-abc", capturedReq.Metadata.WorkflowID) + assert.Equal(t, "0xowner", capturedReq.Metadata.WorkflowOwner) + assert.Equal(t, "my-workflow", capturedReq.Metadata.WorkflowName) + assert.Equal(t, "v2", capturedReq.Metadata.WorkflowTag) + + // Verify payload contents. + var confReq confworkflowtypes.ConfidentialWorkflowRequest + require.NoError(t, capturedReq.Payload.UnmarshalTo(&confReq)) + + assert.Equal(t, "wf-abc", confReq.Execution.WorkflowId) + assert.Equal(t, "https://example.com/wasm", confReq.Execution.BinaryUrl) + assert.Equal(t, binaryHash, confReq.Execution.BinaryHash) + + // Verify the serialized ExecuteRequest round-trips. + var roundTripped sdkpb.ExecuteRequest + require.NoError(t, proto.Unmarshal(confReq.Execution.ExecuteRequest, &roundTripped)) + assert.Equal(t, execReq.GetConfig(), roundTripped.GetConfig()) + + // Verify secrets. + require.Len(t, confReq.VaultDonSecrets, 2) + assert.Equal(t, "K1", confReq.VaultDonSecrets[0].Key) + assert.Equal(t, "ns1", confReq.VaultDonSecrets[0].GetNamespace()) + assert.Equal(t, "K2", confReq.VaultDonSecrets[1].Key) + assert.Equal(t, "main", confReq.VaultDonSecrets[1].GetNamespace()) + }) +} + +func TestConfidentialModule_InterfaceMethods(t *testing.T) { + mod := &ConfidentialModule{} + + // These are no-ops but should not panic. + mod.Start() + mod.Close() + assert.False(t, mod.IsLegacyDAG()) +} diff --git a/core/store/migrate/migrations/0291_add_workflow_attributes_column.sql b/core/store/migrate/migrations/0291_add_workflow_attributes_column.sql new file mode 100644 index 00000000000..30c85db7289 --- /dev/null +++ b/core/store/migrate/migrations/0291_add_workflow_attributes_column.sql @@ -0,0 +1,5 @@ +-- +goose Up +ALTER TABLE workflow_specs_v2 ADD COLUMN attributes bytea DEFAULT ''; + +-- +goose Down +ALTER TABLE workflow_specs_v2 DROP COLUMN attributes; diff --git a/go.mod b/go.mod index 9bf4d88a2d0..55646fde4da 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260121163256-85accaf3d28d github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250912190424-fd2e35d7deb5 github.com/smartcontractkit/chainlink-ccv v0.0.0-20260224105024-807568ff394d - github.com/smartcontractkit/chainlink-common v0.10.1-0.20260223203940-c4713ecec7d4 + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260224225419-177ddc60abbe github.com/smartcontractkit/chainlink-common/keystore v1.0.2 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 github.com/smartcontractkit/chainlink-data-streams v0.1.12-0.20260224150904-47eef37d9005 @@ -96,7 +96,7 @@ require ( github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20251210101658-1c5c8e4c4f15 github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20251021173435-e86785845942 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260224231504-2fedc0c56894 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0 github.com/smartcontractkit/chainlink-protos/ring/go v0.0.0-20260128151123-605e9540b706 diff --git a/go.sum b/go.sum index 6da0bbe6c95..6f0b378fddf 100644 --- a/go.sum +++ b/go.sum @@ -1180,8 +1180,8 @@ github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250 github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250912190424-fd2e35d7deb5/go.mod h1:xtZNi6pOKdC3sLvokDvXOhgHzT+cyBqH/gWwvxTxqrg= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260224105024-807568ff394d h1:eTMWOP3Q91Qp7b+D2nuh9k2WHct57tBxhn+524YFJq0= github.com/smartcontractkit/chainlink-ccv v0.0.0-20260224105024-807568ff394d/go.mod h1:RnuNcn7DZmjmzEkeEWX0uL5y1oslB3c9URPLOjFU+jE= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260223203940-c4713ecec7d4 h1:plpa9pAG7YqyeKnxX+KqqzfBBTDK7mMndVNBHP1yMpU= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260223203940-c4713ecec7d4/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260224225419-177ddc60abbe h1:+rlkA44P9DTSM9tMhMEMbqKH2RbeMau3PasDZ/keFog= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260224225419-177ddc60abbe/go.mod h1:ASljsyuwcpHzLTCAoqhAiZyPAA8z6BWCXebnUKVuVbc= github.com/smartcontractkit/chainlink-common/keystore v1.0.2 h1:AWisx4JT3QV8tcgh6J5NCrex+wAgTYpWyHsyNPSXzsQ= github.com/smartcontractkit/chainlink-common/keystore v1.0.2/go.mod h1:rSkIHdomyak3YnUtXLenl6poIq8q0V3UZPiiyYqPdGA= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= @@ -1216,8 +1216,8 @@ github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0. github.com/smartcontractkit/chainlink-protos/chainlink-ccv/message-discovery v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:ATjAPIVJibHRcIfiG47rEQkUIOoYa6KDvWj3zwCAw6g= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d h1:AJy55QJ/pBhXkZjc7N+ATnWfxrcjq9BI9DmdtdjwDUQ= github.com/smartcontractkit/chainlink-protos/chainlink-ccv/verifier v0.0.0-20251211142334-5c3421fe2c8d/go.mod h1:5JdppgngCOUS76p61zCinSCgOhPeYQ+OcDUuome5THQ= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f h1:MHlgzqiDPyDV397bZkzS9TtWXb3FR9Pb8FR9cP9h0As= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260224231504-2fedc0c56894 h1:F7DkprQU5szdkOaFDL7iI8WuQS+AJBdQHKfLH0iga/k= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260224231504-2fedc0c56894/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260211172625-dff40e83b3c9 h1:hhevsu8k7tlDRrYZmgAh7V4avGQDMvus1bwIlial3Ps= diff --git a/plugins/plugins.private.yaml b/plugins/plugins.private.yaml index f16c0d8d2c4..599b5a8cab9 100644 --- a/plugins/plugins.private.yaml +++ b/plugins/plugins.private.yaml @@ -55,3 +55,7 @@ plugins: - moduleURI: "github.com/smartcontractkit/confidential-compute/enclave/apps/confidential-http/capability" gitRef: "6e03ab2b759f6a6983673e4071b712438b1c923c" installPath: "./cmd/confidential-http" + confidential-workflows: + - moduleURI: "github.com/smartcontractkit/confidential-compute/enclave/apps/confidential-workflows/capability" + gitRef: "47f393dc95c72fd048b7ff39e18537a777366b5d" + installPath: "./cmd/confidential-workflows"