Skip to content

Commit a9feed0

Browse files
committed
Guard against stuck orchestration, implement abandon
1 parent a99dac2 commit a9feed0

File tree

1 file changed

+102
-53
lines changed

1 file changed

+102
-53
lines changed

durabletask/testing/in_memory_backend.py

Lines changed: 102 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class EntityState:
6666
last_modified_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
6767
locked_by: Optional[str] = None
6868
pending_operations: list[pb.HistoryEvent] = field(default_factory=list)
69+
dispatched_operations: list[pb.HistoryEvent] = field(default_factory=list)
6970
completion_token: int = 0
7071

7172

@@ -515,6 +516,7 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
515516
# Drain all pending operations into a batch
516517
operations = list(entity.pending_operations)
517518
entity.pending_operations.clear()
519+
entity.dispatched_operations = list(operations)
518520

519521
# Use V2 EntityRequest format so the worker
520522
# can properly build operation_infos
@@ -586,9 +588,25 @@ def CompleteOrchestratorTask(self, request: pb.OrchestratorResponse, context):
586588
elif evt.HasField("executionResumed"):
587589
instance.status = pb.ORCHESTRATION_STATUS_RUNNING
588590

589-
# Process actions
590-
for action in request.actions:
591-
self._process_action(instance, action)
591+
# Process actions — wrapped in try/except to ensure the
592+
# orchestration is never left permanently stuck in the
593+
# in-flight set when an action handler raises.
594+
try:
595+
for action in request.actions:
596+
self._process_action(instance, action)
597+
except Exception as e:
598+
self._logger.error(
599+
f"Error processing actions for instance "
600+
f"'{request.instanceId}': {e}")
601+
# Mark the orchestration as failed so it doesn't stay
602+
# in a running/pending state with no way to make progress.
603+
if not self._is_terminal_status(instance.status):
604+
instance.status = pb.ORCHESTRATION_STATUS_FAILED
605+
instance.failure_details = pb.TaskFailureDetails(
606+
errorType=type(e).__name__,
607+
errorMessage=str(e),
608+
isNonRetriable=True,
609+
)
592610

593611
# Update completion token for next execution
594612
instance.completion_token = self._next_completion_token
@@ -666,6 +684,7 @@ def CompleteEntityTask(self, request: pb.EntityBatchResult, context):
666684
else:
667685
entity.serialized_state = None
668686
entity.last_modified_at = datetime.now(timezone.utc)
687+
entity.dispatched_operations.clear()
669688

670689
# Update completion token for next batch
671690
entity.completion_token = self._next_completion_token
@@ -674,62 +693,68 @@ def CompleteEntityTask(self, request: pb.EntityBatchResult, context):
674693
# Clear the in-flight flag
675694
self._entity_in_flight.discard(entity.instance_id)
676695

677-
# Deliver operation results to calling orchestrations
696+
# Deliver operation results to calling orchestrations.
697+
# Each delivery is individually guarded so that one failure
698+
# does not prevent subsequent results from being delivered.
678699
for i, op_info in enumerate(request.operationInfos):
679-
dest = op_info.responseDestination
680-
if dest and dest.instanceId:
681-
parent_instance_id = op_info.responseDestination.instanceId
682-
parent_instance = self._instances.get(parent_instance_id)
683-
if parent_instance:
684-
result = request.results[i] if i < len(request.results) else None
685-
if result and result.HasField("success"):
686-
event = pb.HistoryEvent(
687-
eventId=-1,
688-
timestamp=timestamp_pb2.Timestamp(),
689-
entityOperationCompleted=pb.EntityOperationCompletedEvent(
690-
requestId=op_info.requestId,
691-
output=result.success.result,
700+
try:
701+
dest = op_info.responseDestination
702+
if dest and dest.instanceId:
703+
parent_instance_id = op_info.responseDestination.instanceId
704+
parent_instance = self._instances.get(parent_instance_id)
705+
if parent_instance:
706+
result = request.results[i] if i < len(request.results) else None
707+
if result and result.HasField("success"):
708+
event = pb.HistoryEvent(
709+
eventId=-1,
710+
timestamp=timestamp_pb2.Timestamp(),
711+
entityOperationCompleted=pb.EntityOperationCompletedEvent(
712+
requestId=op_info.requestId,
713+
output=result.success.result,
714+
)
692715
)
693-
)
694-
elif result and result.HasField("failure"):
695-
event = pb.HistoryEvent(
696-
eventId=-1,
697-
timestamp=timestamp_pb2.Timestamp(),
698-
entityOperationFailed=pb.EntityOperationFailedEvent(
699-
requestId=op_info.requestId,
700-
failureDetails=result.failure.failureDetails,
716+
elif result and result.HasField("failure"):
717+
event = pb.HistoryEvent(
718+
eventId=-1,
719+
timestamp=timestamp_pb2.Timestamp(),
720+
entityOperationFailed=pb.EntityOperationFailedEvent(
721+
requestId=op_info.requestId,
722+
failureDetails=result.failure.failureDetails,
723+
)
701724
)
702-
)
703-
else:
704-
continue
705-
706-
parent_instance.pending_events.append(event)
707-
parent_instance.last_updated_at = datetime.now(timezone.utc)
708-
self._enqueue_orchestration(parent_instance_id)
709-
710-
# Process side-effect actions (signals to other entities, new orchestrations)
725+
else:
726+
continue
727+
728+
parent_instance.pending_events.append(event)
729+
parent_instance.last_updated_at = datetime.now(timezone.utc)
730+
self._enqueue_orchestration(parent_instance_id)
731+
except Exception:
732+
self._logger.exception(
733+
f"Error delivering entity result for operation {i}")
734+
735+
# Process side-effect actions (signals to other entities, new orchestrations).
736+
# Each action is individually guarded for the same reason.
711737
for action in request.actions:
712-
if action.HasField("sendSignal"):
713-
signal = action.sendSignal
714-
self._signal_entity_internal(
715-
signal.instanceId, signal.name,
716-
signal.input.value if signal.input else None
717-
)
718-
elif action.HasField("startNewOrchestration"):
719-
start_orch = action.startNewOrchestration
720-
orch_input = start_orch.input.value if start_orch.input else None
721-
orch_version = start_orch.version.value \
722-
if start_orch.HasField("version") else None
723-
instance_id = start_orch.instanceId or uuid.uuid4().hex
724-
try:
738+
try:
739+
if action.HasField("sendSignal"):
740+
signal = action.sendSignal
741+
self._signal_entity_internal(
742+
signal.instanceId, signal.name,
743+
signal.input.value if signal.input else None
744+
)
745+
elif action.HasField("startNewOrchestration"):
746+
start_orch = action.startNewOrchestration
747+
orch_input = start_orch.input.value if start_orch.input else None
748+
orch_version = start_orch.version.value \
749+
if start_orch.HasField("version") else None
750+
instance_id = start_orch.instanceId or uuid.uuid4().hex
725751
self._create_instance_internal(
726752
instance_id, start_orch.name, orch_input,
727753
version=orch_version,
728754
)
729-
except Exception:
730-
self._logger.warning(
731-
f"Failed to create orchestration '{instance_id}' from entity action"
732-
)
755+
except Exception:
756+
self._logger.exception(
757+
"Error processing entity side-effect action")
733758

734759
# If the entity has more pending operations, re-enqueue
735760
if entity.pending_operations:
@@ -964,11 +989,35 @@ def AbandonTaskActivityWorkItem(self, request: pb.AbandonActivityTaskRequest, co
964989
return pb.AbandonActivityTaskResponse()
965990

966991
def AbandonTaskOrchestratorWorkItem(self, request: pb.AbandonOrchestrationTaskRequest, context):
967-
"""Abandons an orchestration work item."""
992+
"""Abandons an orchestration work item, restoring it for re-processing."""
993+
with self._lock:
994+
for instance_id in list(self._orchestration_in_flight):
995+
instance = self._instances.get(instance_id)
996+
if instance and str(instance.completion_token) == request.completionToken:
997+
# Move dispatched events back to pending so they will
998+
# be re-sent on the next dispatch cycle.
999+
instance.pending_events = list(instance.dispatched_events) + list(instance.pending_events)
1000+
instance.dispatched_events.clear()
1001+
self._orchestration_in_flight.discard(instance_id)
1002+
self._enqueue_orchestration(instance_id)
1003+
self._logger.info(
1004+
f"Abandoned orchestration work item for '{instance_id}'")
1005+
break
9681006
return pb.AbandonOrchestrationTaskResponse()
9691007

9701008
def AbandonTaskEntityWorkItem(self, request: pb.AbandonEntityTaskRequest, context):
971-
"""Abandons an entity work item."""
1009+
"""Abandons an entity work item, restoring it for re-processing."""
1010+
with self._lock:
1011+
for entity in self._entities.values():
1012+
if str(entity.completion_token) == request.completionToken:
1013+
# Restore dispatched operations back to pending.
1014+
entity.pending_operations = list(entity.dispatched_operations) + list(entity.pending_operations)
1015+
entity.dispatched_operations.clear()
1016+
self._entity_in_flight.discard(entity.instance_id)
1017+
self._enqueue_entity(entity.instance_id)
1018+
self._logger.info(
1019+
f"Abandoned entity work item for '{entity.instance_id}'")
1020+
break
9721021
return pb.AbandonEntityTaskResponse()
9731022

9741023
# Internal helper methods

0 commit comments

Comments
 (0)