Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0cde0b1
feat(control-plane): add VC-based authorization foundation
AbirAbbas Feb 4, 2026
0106624
feat(vc-authorization): complete VC-based authorization system implem…
AbirAbbas Feb 5, 2026
c0f1d57
fix(control-plane): fix pre-existing test bugs exposed by FTS5 build tag
AbirAbbas Feb 5, 2026
6119f8f
fix(control-plane): fix pre-existing test bugs exposed by FTS5 build tag
AbirAbbas Feb 5, 2026
9fe8858
fix(control-plane): fix SSE tests leaking goroutines via incorrect co…
AbirAbbas Feb 5, 2026
8a4c3f3
ts sdk and bug fix on did web
AbirAbbas Feb 5, 2026
f90a875
feat(examples): add permission test agents and enable VC authorizatio…
AbirAbbas Feb 5, 2026
20bda32
Fixes
AbirAbbas Feb 6, 2026
393e519
fix(sdk-python): update test fakes for DID credential wiring in _regi…
AbirAbbas Feb 6, 2026
39a1954
more improvements
AbirAbbas Feb 6, 2026
8e671fb
6th iteration of fixes
AbirAbbas Feb 6, 2026
b83f5be
end to end tested
AbirAbbas Feb 6, 2026
8396b29
feat(sdk): add Go & TS permission test agents, fix DID auth signing
AbirAbbas Feb 6, 2026
026e4e2
fix(sdk-ts): update header-forwarding test for pre-serialized JSON body
AbirAbbas Feb 6, 2026
54aa126
manual testing updates
AbirAbbas Feb 6, 2026
c5e5556
fix(vc-auth): fix re-approval deadlock, empty caller_agent_id, and er…
AbirAbbas Feb 7, 2026
cbbcd40
fix go missing func
AbirAbbas Feb 8, 2026
e030d81
address dx changes
AbirAbbas Feb 9, 2026
7095941
temp
AbirAbbas Feb 9, 2026
668ed79
more fixes
AbirAbbas Feb 10, 2026
0cac1c2
finalized
AbirAbbas Feb 10, 2026
5df611a
better error prop
AbirAbbas Feb 10, 2026
fa18933
fix: update TS DID auth tests to match nonce-based signing format
AbirAbbas Feb 10, 2026
6ffe576
fix: add rate limiting to DID auth middleware on execution endpoints
AbirAbbas Feb 10, 2026
5cdfdf8
fix: use express-rate-limit for DID auth middleware to satisfy CodeQL
AbirAbbas Feb 10, 2026
27455d7
fix: remove duplicate countWorkflowRuns method from rebase
AbirAbbas Feb 10, 2026
ccba62a
UI cleanup
AbirAbbas Feb 10, 2026
6a09ce0
pydantic formatting fix
AbirAbbas Feb 11, 2026
3d6a50b
connector changes
AbirAbbas Feb 12, 2026
9db17be
implemented multi agents with versioning
AbirAbbas Feb 13, 2026
4ac437f
feat(ui): polished authorization page with unified tabs and visual st…
santoshkumarradha Feb 13, 2026
70a5072
Merge branch 'feat/vc-authorization' into feat/connector
AbirAbbas Feb 13, 2026
9ae4e62
multi versioning connector setup
AbirAbbas Feb 13, 2026
d89eb23
add agent to agent direct checks
AbirAbbas Feb 17, 2026
234c441
Merge branch 'feat/vc-authorization' into feat/connector
AbirAbbas Feb 18, 2026
1a6c32a
bugfixes on connector
AbirAbbas Feb 20, 2026
8d25a95
QA fixes
AbirAbbas Feb 20, 2026
c248e89
package lock
AbirAbbas Feb 21, 2026
34f1c74
Merge main into feat/vc-authorization
AbirAbbas Feb 21, 2026
f9d30dd
Merge branch 'feat/vc-authorization' into feat/connector
AbirAbbas Feb 23, 2026
f9d9dcf
bug fixes on permissions & versioning flow
AbirAbbas Feb 23, 2026
380dcb2
Merge main into feat/connector
AbirAbbas Feb 25, 2026
4f7fe7a
fix: add missing DeleteAgentVersion stub and guard postgres migration…
AbirAbbas Feb 25, 2026
a75e963
add postgres testing to dev
AbirAbbas Feb 25, 2026
2781ae8
wait flow
AbirAbbas Feb 25, 2026
726cdfe
improvements
AbirAbbas Feb 27, 2026
f7a4a4d
bugfix on reasoner path
AbirAbbas Mar 2, 2026
f427b9b
reasoner name mismatch fix
AbirAbbas Mar 2, 2026
751bc79
fix skill name mismatch bug
AbirAbbas Mar 2, 2026
c88326d
Merge origin/main into feat/waiting-state
AbirAbbas Mar 2, 2026
14ed059
fix: update test to include approval_expires_at column
AbirAbbas Mar 2, 2026
cbd692f
fix: remove unused httpx import in test_approval.py
AbirAbbas Mar 2, 2026
6d68959
fix: resolve Python and TypeScript SDK test failures
AbirAbbas Mar 2, 2026
fd8912b
fix: update test URL to match reasoner name-based endpoint path
AbirAbbas Mar 2, 2026
b6f881b
add examples for waiting state
AbirAbbas Mar 2, 2026
f2d007a
fix: resolve Gin route parameter conflict between waiting-state and t…
AbirAbbas Mar 2, 2026
6af58e4
fix tests
AbirAbbas Mar 2, 2026
6146f4b
fix: correct async endpoint URLs and assertion in waiting state funct…
AbirAbbas Mar 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions control-plane/config/agentfield.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions control-plane/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ type AgentFieldConfig struct {
NodeHealth NodeHealthConfig `yaml:"node_health" mapstructure:"node_health"`
ExecutionCleanup ExecutionCleanupConfig `yaml:"execution_cleanup" mapstructure:"execution_cleanup"`
ExecutionQueue ExecutionQueueConfig `yaml:"execution_queue" mapstructure:"execution_queue"`
Approval ApprovalConfig `yaml:"approval" mapstructure:"approval"`
}

// ApprovalConfig holds configuration for the execution approval workflow.
// The control plane manages execution state only — agents are responsible for
// communicating with external approval services (e.g. hax-sdk).
type ApprovalConfig struct {
WebhookSecret string `yaml:"webhook_secret" mapstructure:"webhook_secret"` // Optional HMAC-SHA256 secret for verifying webhook callbacks
DefaultExpiryHours int `yaml:"default_expiry_hours" mapstructure:"default_expiry_hours"` // Default approval expiry (hours); 0 = 72h
}

// NodeHealthConfig holds configuration for agent node health monitoring.
Expand Down Expand Up @@ -303,6 +312,16 @@ func applyEnvOverrides(cfg *Config) {
cfg.Features.DID.Authorization.InternalToken = val
}

// Approval workflow overrides
if val := os.Getenv("AGENTFIELD_APPROVAL_WEBHOOK_SECRET"); val != "" {
cfg.AgentField.Approval.WebhookSecret = val
}
if val := os.Getenv("AGENTFIELD_APPROVAL_DEFAULT_EXPIRY_HOURS"); val != "" {
if i, err := strconv.Atoi(val); err == nil {
cfg.AgentField.Approval.DefaultExpiryHours = i
}
}

// Connector overrides
if val := os.Getenv("AGENTFIELD_CONNECTOR_ENABLED"); val != "" {
cfg.Features.Connector.Enabled = val == "true" || val == "1"
Expand Down
40 changes: 35 additions & 5 deletions control-plane/internal/events/execution_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
type ExecutionEventType string

const (
ExecutionCreated ExecutionEventType = "execution_created"
ExecutionStarted ExecutionEventType = "execution_started"
ExecutionUpdated ExecutionEventType = "execution_updated"
ExecutionCompleted ExecutionEventType = "execution_completed"
ExecutionFailed ExecutionEventType = "execution_failed"
ExecutionCreated ExecutionEventType = "execution_created"
ExecutionStarted ExecutionEventType = "execution_started"
ExecutionUpdated ExecutionEventType = "execution_updated"
ExecutionCompleted ExecutionEventType = "execution_completed"
ExecutionFailed ExecutionEventType = "execution_failed"
ExecutionWaiting ExecutionEventType = "execution_waiting"
ExecutionApprovalResolved ExecutionEventType = "execution_approval_resolved"
)

// ExecutionEvent represents an execution state change event
Expand Down Expand Up @@ -177,3 +179,31 @@ func PublishExecutionFailed(executionID, workflowID, agentNodeID string, data in
}
GlobalExecutionEventBus.Publish(event)
}

// PublishExecutionWaiting publishes an event when an execution enters the waiting state.
func PublishExecutionWaiting(executionID, workflowID, agentNodeID string, data interface{}) {
event := ExecutionEvent{
Type: ExecutionWaiting,
ExecutionID: executionID,
WorkflowID: workflowID,
AgentNodeID: agentNodeID,
Status: types.ExecutionStatusWaiting,
Timestamp: time.Now(),
Data: data,
}
GlobalExecutionEventBus.Publish(event)
}

// PublishExecutionApprovalResolved publishes an event when an approval decision is received.
func PublishExecutionApprovalResolved(executionID, workflowID, agentNodeID, newStatus string, data interface{}) {
event := ExecutionEvent{
Type: ExecutionApprovalResolved,
ExecutionID: executionID,
WorkflowID: workflowID,
AgentNodeID: agentNodeID,
Status: newStatus,
Timestamp: time.Now(),
Data: data,
}
GlobalExecutionEventBus.Publish(event)
}
147 changes: 138 additions & 9 deletions control-plane/internal/handlers/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type ExecutionStore interface {
StoreWorkflowExecution(ctx context.Context, execution *types.WorkflowExecution) error
UpdateWorkflowExecution(ctx context.Context, executionID string, updateFunc func(*types.WorkflowExecution) (*types.WorkflowExecution, error)) error
GetWorkflowExecution(ctx context.Context, executionID string) (*types.WorkflowExecution, error)
QueryWorkflowExecutions(ctx context.Context, filters types.WorkflowExecutionFilters) ([]*types.WorkflowExecution, error)
StoreWorkflowExecutionEvent(ctx context.Context, event *types.WorkflowExecutionEvent) error
GetExecutionEventBus() *events.ExecutionEventBus
}

Expand Down Expand Up @@ -88,6 +90,7 @@ type ExecutionStatusResponse struct {
ExecutionID string `json:"execution_id"`
RunID string `json:"run_id"`
Status string `json:"status"`
StatusReason *string `json:"status_reason,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *string `json:"error,omitempty"`
ErrorDetails interface{} `json:"error_details,omitempty"`
Expand All @@ -96,6 +99,10 @@ type ExecutionStatusResponse struct {
DurationMS *int64 `json:"duration_ms,omitempty"`
WebhookRegistered bool `json:"webhook_registered"`
WebhookEvents []*types.ExecutionWebhookEvent `json:"webhook_events,omitempty"`
// Approval fields (populated when execution has an active approval request)
ApprovalRequestID *string `json:"approval_request_id,omitempty"`
ApprovalStatus *string `json:"approval_status,omitempty"`
ApprovalRequestURL *string `json:"approval_request_url,omitempty"`
}

// BatchStatusRequest allows the UI to fetch multiple execution statuses at once.
Expand All @@ -107,12 +114,13 @@ type BatchStatusRequest struct {
type BatchStatusResponse map[string]ExecutionStatusResponse

type executionStatusUpdateRequest struct {
Status string `json:"status" binding:"required"`
Result map[string]interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
DurationMS *int64 `json:"duration_ms,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Progress *int `json:"progress,omitempty"`
Status string `json:"status" binding:"required"`
StatusReason *string `json:"status_reason,omitempty"`
Result map[string]interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
DurationMS *int64 `json:"duration_ms,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Progress *int `json:"progress,omitempty"`
}

type executionController struct {
Expand Down Expand Up @@ -411,7 +419,7 @@ func (c *executionController) handleStatus(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, renderStatus(exec))
ctx.JSON(http.StatusOK, c.renderStatusWithApproval(reqCtx, exec))
}

func (c *executionController) handleBatchStatus(ctx *gin.Context) {
Expand Down Expand Up @@ -440,7 +448,7 @@ func (c *executionController) handleBatchStatus(ctx *gin.Context) {
}
continue
}
response[id] = renderStatus(exec)
response[id] = c.renderStatusWithApproval(reqCtx, exec)
}

ctx.JSON(http.StatusOK, response)
Expand Down Expand Up @@ -488,7 +496,29 @@ func (c *executionController) handleStatusUpdate(ctx *gin.Context) {
return nil, fmt.Errorf("execution %s not found", executionID)
}

// Guard: executions in "waiting" state can only transition to
// running, cancelled, or failed. The approval webhook handler
// manages the waiting→running transition; direct jumps to
// succeeded or timeout would desync the executions and
// workflow_executions tables.
if current.Status == types.ExecutionStatusWaiting {
switch normalizedStatus {
case string(types.ExecutionStatusRunning),
string(types.ExecutionStatusCancelled),
string(types.ExecutionStatusFailed):
// allowed
default:
logger.Logger.Warn().
Str("execution_id", executionID).
Str("current_status", string(current.Status)).
Str("requested_status", normalizedStatus).
Msg("rejecting status update: execution is waiting for approval")
return nil, fmt.Errorf("execution %s is in 'waiting' state; only running, cancelled, or failed transitions are allowed", executionID)
}
}

current.Status = normalizedStatus
current.StatusReason = req.StatusReason
if len(resultBytes) > 0 {
current.ResultPayload = json.RawMessage(resultBytes)
current.ResultURI = resultURI
Expand Down Expand Up @@ -547,6 +577,8 @@ func (c *executionController) handleStatusUpdate(ctx *gin.Context) {
elapsed = time.Duration(*updated.DurationMS) * time.Millisecond
}

c.updateWorkflowExecutionStatus(reqCtx, executionID, normalizedStatus, req.StatusReason)

if isTerminal {
c.updateWorkflowExecutionFinalState(reqCtx, executionID, types.ExecutionStatus(normalizedStatus), updated.ResultPayload, elapsed, errorMsg)
if updated.WebhookRegistered {
Expand All @@ -559,12 +591,63 @@ func (c *executionController) handleStatusUpdate(ctx *gin.Context) {
"error": req.Error,
"progress": req.Progress,
}
if req.StatusReason != nil && strings.TrimSpace(*req.StatusReason) != "" {
eventData["status_reason"] = strings.TrimSpace(*req.StatusReason)
}
if inputPayload := decodeJSON(updated.InputPayload); inputPayload != nil {
eventData["input"] = inputPayload
}
c.publishExecutionEvent(updated, normalizedStatus, eventData)

ctx.JSON(http.StatusOK, renderStatus(updated))
ctx.JSON(http.StatusOK, c.renderStatusWithApproval(reqCtx, updated))
}

func (c *executionController) updateWorkflowExecutionStatus(
ctx context.Context,
executionID string,
status string,
statusReason *string,
) {
if c.store == nil {
return
}

var normalizedReason *string
if statusReason != nil {
trimmed := strings.TrimSpace(*statusReason)
if trimmed != "" {
normalizedReason = &trimmed
}
}

err := c.store.UpdateWorkflowExecution(ctx, executionID, func(current *types.WorkflowExecution) (*types.WorkflowExecution, error) {
if current == nil {
return nil, fmt.Errorf("execution with ID %s not found", executionID)
}

current.Status = status
current.StatusReason = normalizedReason
current.UpdatedAt = time.Now().UTC()

if !types.IsTerminalExecutionStatus(status) {
current.CompletedAt = nil
if current.DurationMS != nil {
current.DurationMS = nil
}
}

return current, nil
})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "not found") {
return
}
logger.Logger.Error().
Err(err).
Str("execution_id", executionID).
Str("status", status).
Msg("failed to update workflow execution status")
}
}

func (c *executionController) publishExecutionEvent(exec *types.Execution, status string, data map[string]interface{}) {
Expand Down Expand Up @@ -1052,11 +1135,22 @@ func (c *executionController) completeExecution(ctx context.Context, plan *prepa
resultURI := c.savePayload(ctx, result)

var lastErr error
var alreadyCancelled bool
for attempt := 0; attempt < 5; attempt++ {
updated, err := c.store.UpdateExecutionRecord(ctx, plan.exec.ExecutionID, func(current *types.Execution) (*types.Execution, error) {
if current == nil {
return nil, fmt.Errorf("execution %s not found", plan.exec.ExecutionID)
}
// Guard: don't overwrite if already cancelled (e.g. by approval rejection webhook)
// or waiting for approval — the approval webhook handler manages the transition.
if current.Status == types.ExecutionStatusCancelled || current.Status == types.ExecutionStatusWaiting {
logger.Logger.Info().
Str("execution_id", plan.exec.ExecutionID).
Str("current_status", string(current.Status)).
Msg("skipping completion update; execution already cancelled or waiting for approval")
alreadyCancelled = true
return current, nil
}
now := time.Now().UTC()
current.Status = types.ExecutionStatusSucceeded
current.ResultPayload = json.RawMessage(result)
Expand All @@ -1069,6 +1163,9 @@ func (c *executionController) completeExecution(ctx context.Context, plan *prepa
return current, nil
})
if err == nil {
if alreadyCancelled {
return nil
}
c.updateWorkflowExecutionFinalState(
ctx,
plan.exec.ExecutionID,
Expand Down Expand Up @@ -1104,11 +1201,22 @@ func (c *executionController) failExecution(ctx context.Context, plan *preparedE
errMsg := callErr.Error()
resultURI := c.savePayload(ctx, result)
var lastErr error
var alreadyCancelled bool
for attempt := 0; attempt < 5; attempt++ {
updated, err := c.store.UpdateExecutionRecord(ctx, plan.exec.ExecutionID, func(current *types.Execution) (*types.Execution, error) {
if current == nil {
return nil, fmt.Errorf("execution %s not found", plan.exec.ExecutionID)
}
// Guard: don't overwrite if already cancelled (e.g. by approval rejection webhook)
// or waiting for approval — the approval webhook handler manages the transition.
if current.Status == types.ExecutionStatusCancelled || current.Status == types.ExecutionStatusWaiting {
logger.Logger.Info().
Str("execution_id", plan.exec.ExecutionID).
Str("current_status", string(current.Status)).
Msg("skipping failure update; execution already cancelled or waiting for approval")
alreadyCancelled = true
return current, nil
}
now := time.Now().UTC()
current.Status = types.ExecutionStatusFailed
current.ErrorMessage = &errMsg
Expand All @@ -1123,6 +1231,9 @@ func (c *executionController) failExecution(ctx context.Context, plan *preparedE
return current, nil
})
if err == nil {
if alreadyCancelled {
return nil
}
c.updateWorkflowExecutionFinalState(
ctx,
plan.exec.ExecutionID,
Expand Down Expand Up @@ -1457,6 +1568,7 @@ func renderStatus(exec *types.Execution) ExecutionStatusResponse {
ExecutionID: exec.ExecutionID,
RunID: exec.RunID,
Status: exec.Status,
StatusReason: exec.StatusReason,
Result: decodeJSON(exec.ResultPayload),
Error: exec.ErrorMessage,
StartedAt: exec.StartedAt.UTC().Format(time.RFC3339),
Expand All @@ -1473,6 +1585,23 @@ func renderStatus(exec *types.Execution) ExecutionStatusResponse {
return resp
}

// renderStatusWithApproval enriches the base status response with approval
// fields from the corresponding WorkflowExecution record, if one exists.
func (c *executionController) renderStatusWithApproval(ctx context.Context, exec *types.Execution) ExecutionStatusResponse {
resp := renderStatus(exec)

// Best-effort enrichment — if the lookup fails we still return the base response.
wfExec, err := c.store.GetWorkflowExecution(ctx, exec.ExecutionID)
if err != nil || wfExec == nil {
return resp
}

resp.ApprovalRequestID = wfExec.ApprovalRequestID
resp.ApprovalStatus = wfExec.ApprovalStatus
resp.ApprovalRequestURL = wfExec.ApprovalRequestURL
return resp
}

func (c *executionController) ensureWorkflowExecutionRecord(ctx context.Context, exec *types.Execution, target *parsedTarget, payload []byte) {
workflowExec := c.buildWorkflowExecutionRecord(ctx, exec, target, payload)
if workflowExec == nil {
Expand Down
Loading
Loading