From 32743dcca1977f49807c23f5c5965331a5704e25 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 4 Mar 2026 17:32:40 +0000 Subject: [PATCH 1/3] runtimestate: Remove stalled state for non-stalled runtimes Fix the stalled state being set for runtimes that were stalled at one point but are no longer stalled. Signed-off-by: joshvanl --- backend/runtimestate/runtimestate.go | 12 +- backend/runtimestate/runtimestate_test.go | 184 ++++++++++++++++++++++ 2 files changed, 193 insertions(+), 3 deletions(-) create mode 100644 backend/runtimestate/runtimestate_test.go diff --git a/backend/runtimestate/runtimestate.go b/backend/runtimestate/runtimestate.go index 83f7733..a862b61 100644 --- a/backend/runtimestate/runtimestate.go +++ b/backend/runtimestate/runtimestate.go @@ -52,15 +52,21 @@ func addEvent(s *protos.OrchestrationRuntimeState, e *protos.HistoryEvent, isNew s.IsSuspended = true } else if e.GetExecutionResumed() != nil { s.IsSuspended = false - } else if e.GetExecutionStalled() != nil { + } else if stalledEvent := e.GetExecutionStalled(); stalledEvent != nil { s.Stalled = &protos.RuntimeStateStalled{ - Reason: e.GetExecutionStalled().Reason, - Description: e.GetExecutionStalled().Description, + Reason: stalledEvent.Reason, + Description: stalledEvent.Description, } } else { // TODO: Check for other possible duplicates using task IDs } + // Any successfully processed event clears a prior stalled state, unless + // the event itself is a stalled event. + if e.GetExecutionStalled() == nil { + s.Stalled = nil + } + if isNew { s.NewEvents = append(s.NewEvents, e) } else { diff --git a/backend/runtimestate/runtimestate_test.go b/backend/runtimestate/runtimestate_test.go new file mode 100644 index 0000000..77ff01d --- /dev/null +++ b/backend/runtimestate/runtimestate_test.go @@ -0,0 +1,184 @@ +package runtimestate + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/dapr/durabletask-go/api/protos" +) + +func startedEvent() *protos.HistoryEvent { + return &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_ExecutionStarted{ + ExecutionStarted: &protos.ExecutionStartedEvent{ + Name: "test-orchestrator", + }, + }, + } +} + +func stalledEvent(reason protos.StalledReason, description string) *protos.HistoryEvent { + return &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_ExecutionStalled{ + ExecutionStalled: &protos.ExecutionStalledEvent{ + Reason: reason, + Description: proto.String(description), + }, + }, + } +} + +func TestAddEvent_StalledClearedOnNewEvent(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + newEvent *protos.HistoryEvent + }{ + { + name: "stalled cleared by ExecutionSuspended", + newEvent: &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_ExecutionSuspended{ + ExecutionSuspended: &protos.ExecutionSuspendedEvent{}, + }, + }, + }, + { + name: "stalled cleared by ExecutionResumed", + newEvent: &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_ExecutionResumed{ + ExecutionResumed: &protos.ExecutionResumedEvent{}, + }, + }, + }, + { + name: "stalled cleared by TaskScheduled", + newEvent: &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_TaskScheduled{ + TaskScheduled: &protos.TaskScheduledEvent{ + Name: "test-activity", + }, + }, + }, + }, + { + name: "stalled cleared by ExecutionCompleted", + newEvent: &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_ExecutionCompleted{ + ExecutionCompleted: &protos.ExecutionCompletedEvent{ + OrchestrationStatus: protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + s := NewOrchestrationRuntimeState("test-instance", nil, nil) + + // Add a start event so RuntimeStatus can reach the stalled check. + require.NoError(t, AddEvent(s, startedEvent())) + + // Add the stalled event. + require.NoError(t, AddEvent(s, stalledEvent(protos.StalledReason_PATCH_MISMATCH, "test stall"))) + require.NotNil(t, s.Stalled, "expected Stalled to be set after stalled event") + assert.Equal(t, protos.StalledReason_PATCH_MISMATCH, s.Stalled.Reason) + assert.Equal(t, "test stall", s.Stalled.GetDescription()) + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_STALLED, RuntimeStatus(s)) + + // Add another event which should clear the stalled state. + require.NoError(t, AddEvent(s, tt.newEvent)) + assert.Nil(t, s.Stalled, "expected Stalled to be cleared after new event") + assert.NotEqual(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_STALLED, RuntimeStatus(s)) + }) + } +} + +func TestAddEvent_StalledSetFromOldEvents(t *testing.T) { + t.Parallel() + + s := NewOrchestrationRuntimeState("test-instance", nil, []*protos.HistoryEvent{ + startedEvent(), + stalledEvent(protos.StalledReason_VERSION_NOT_AVAILABLE, "old stall"), + }) + require.NotNil(t, s.Stalled) + assert.Equal(t, protos.StalledReason_VERSION_NOT_AVAILABLE, s.Stalled.Reason) + assert.Equal(t, "old stall", s.Stalled.GetDescription()) + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_STALLED, RuntimeStatus(s)) +} + +func TestAddEvent_StalledClearedBySubsequentOldEvent(t *testing.T) { + t.Parallel() + + taskScheduled := &protos.HistoryEvent{ + EventId: -1, + Timestamp: timestamppb.Now(), + EventType: &protos.HistoryEvent_TaskScheduled{ + TaskScheduled: &protos.TaskScheduledEvent{ + Name: "test-activity", + }, + }, + } + + // If the history contains a stalled event followed by another event, + // the stalled state should be cleared. + s := NewOrchestrationRuntimeState("test-instance", nil, []*protos.HistoryEvent{ + startedEvent(), + stalledEvent(protos.StalledReason_PATCH_MISMATCH, "stalled"), + taskScheduled, + }) + assert.Nil(t, s.Stalled, "expected Stalled to be cleared by subsequent old event") + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING, RuntimeStatus(s)) +} + +func TestAddEvent_StalledPreservedOnDuplicateError(t *testing.T) { + t.Parallel() + + s := NewOrchestrationRuntimeState("test-instance", nil, nil) + require.NoError(t, AddEvent(s, startedEvent())) + require.NoError(t, AddEvent(s, stalledEvent(protos.StalledReason_PATCH_MISMATCH, "stalled"))) + require.NotNil(t, s.Stalled) + + // A duplicate ExecutionStarted should return an error and NOT clear + // the stalled state. + err := AddEvent(s, startedEvent()) + require.ErrorIs(t, err, ErrDuplicateEvent) + assert.NotNil(t, s.Stalled, "expected Stalled to be preserved on error") + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_STALLED, RuntimeStatus(s)) +} + +func TestAddEvent_StalledReplacedByNewStalled(t *testing.T) { + t.Parallel() + + s := NewOrchestrationRuntimeState("test-instance", nil, nil) + require.NoError(t, AddEvent(s, startedEvent())) + + require.NoError(t, AddEvent(s, stalledEvent(protos.StalledReason_PATCH_MISMATCH, "first stall"))) + require.NotNil(t, s.Stalled) + assert.Equal(t, "first stall", s.Stalled.GetDescription()) + + // A second stalled event should first clear, then set the new stalled state. + require.NoError(t, AddEvent(s, stalledEvent(protos.StalledReason_VERSION_NOT_AVAILABLE, "second stall"))) + require.NotNil(t, s.Stalled) + assert.Equal(t, protos.StalledReason_VERSION_NOT_AVAILABLE, s.Stalled.Reason) + assert.Equal(t, "second stall", s.Stalled.GetDescription()) +} From 6a0cbbdd059f65530ac675b3e9b0c7b17389fbdd Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Wed, 4 Mar 2026 17:49:29 +0000 Subject: [PATCH 2/3] Update backend/runtimestate/runtimestate_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Josh van Leeuwen --- backend/runtimestate/runtimestate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/runtimestate/runtimestate_test.go b/backend/runtimestate/runtimestate_test.go index 77ff01d..0639242 100644 --- a/backend/runtimestate/runtimestate_test.go +++ b/backend/runtimestate/runtimestate_test.go @@ -90,6 +90,7 @@ func TestAddEvent_StalledClearedOnNewEvent(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() From 205ec9d93500d3f28784ef07a9f7b1876ce0d0b2 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Wed, 4 Mar 2026 17:49:48 +0000 Subject: [PATCH 3/3] Update backend/runtimestate/runtimestate_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Josh van Leeuwen --- backend/runtimestate/runtimestate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/runtimestate/runtimestate_test.go b/backend/runtimestate/runtimestate_test.go index 0639242..5b3367b 100644 --- a/backend/runtimestate/runtimestate_test.go +++ b/backend/runtimestate/runtimestate_test.go @@ -177,7 +177,7 @@ func TestAddEvent_StalledReplacedByNewStalled(t *testing.T) { require.NotNil(t, s.Stalled) assert.Equal(t, "first stall", s.Stalled.GetDescription()) - // A second stalled event should first clear, then set the new stalled state. + // A second stalled event should replace the existing stalled state with the new one. require.NoError(t, AddEvent(s, stalledEvent(protos.StalledReason_VERSION_NOT_AVAILABLE, "second stall"))) require.NotNil(t, s.Stalled) assert.Equal(t, protos.StalledReason_VERSION_NOT_AVAILABLE, s.Stalled.Reason)