Skip to content

Commit a99dac2

Browse files
committed
Add orchestration versioning support
1 parent a84f7d0 commit a99dac2

File tree

4 files changed

+126
-312
lines changed

4 files changed

+126
-312
lines changed

durabletask/internal/helpers.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ def new_orchestrator_completed_event() -> pb.HistoryEvent:
2626

2727

2828
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
29-
tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent:
29+
tags: Optional[dict[str, str]] = None,
30+
version: Optional[str] = None) -> pb.HistoryEvent:
3031
return pb.HistoryEvent(
3132
eventId=-1,
3233
timestamp=timestamp_pb2.Timestamp(),
3334
executionStarted=pb.ExecutionStartedEvent(
3435
name=name,
36+
version=get_string_value(version),
3537
input=get_string_value(encoded_input),
3638
orchestrationInstance=pb.OrchestrationInstance(instanceId=instance_id),
3739
tags=tags))
@@ -85,12 +87,14 @@ def new_sub_orchestration_created_event(
8587
event_id: int,
8688
name: str,
8789
instance_id: str,
88-
encoded_input: Optional[str] = None) -> pb.HistoryEvent:
90+
encoded_input: Optional[str] = None,
91+
version: Optional[str] = None) -> pb.HistoryEvent:
8992
return pb.HistoryEvent(
9093
eventId=event_id,
9194
timestamp=timestamp_pb2.Timestamp(),
9295
subOrchestrationInstanceCreated=pb.SubOrchestrationInstanceCreatedEvent(
9396
name=name,
97+
version=get_string_value(version),
9498
input=get_string_value(encoded_input),
9599
instanceId=instance_id)
96100
)

durabletask/testing/in_memory_backend.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class OrchestrationInstance:
3434
instance_id: str
3535
name: str
3636
status: pb.OrchestrationStatus
37+
version: Optional[str] = None
3738
input: Optional[str] = None
3839
output: Optional[str] = None
3940
custom_status: Optional[str] = None
@@ -225,10 +226,12 @@ def StartInstance(self, request: pb.CreateInstanceRequest, context):
225226
start_time = request.scheduledStartTimestamp.ToDatetime(tzinfo=timezone.utc) \
226227
if request.HasField("scheduledStartTimestamp") else now
227228

229+
version = request.version.value if request.HasField("version") else None
228230
instance = OrchestrationInstance(
229231
instance_id=instance_id,
230232
name=request.name,
231233
status=pb.ORCHESTRATION_STATUS_PENDING,
234+
version=version,
232235
input=request.input.value if request.input else None,
233236
created_at=now,
234237
last_updated_at=now,
@@ -242,7 +245,8 @@ def StartInstance(self, request: pb.CreateInstanceRequest, context):
242245
execution_started = helpers.new_execution_started_event(
243246
request.name, instance_id,
244247
request.input.value if request.input else None,
245-
dict(request.tags) if request.tags else None
248+
dict(request.tags) if request.tags else None,
249+
version=version,
246250
)
247251

248252
instance.pending_events.append(orchestrator_started)
@@ -413,6 +417,7 @@ def RestartInstance(self, request: pb.RestartInstanceRequest, context):
413417

414418
name = instance.name
415419
original_input = instance.input
420+
version = instance.version
416421

417422
if request.restartWithNewInstanceId:
418423
new_instance_id = uuid.uuid4().hex
@@ -423,7 +428,8 @@ def RestartInstance(self, request: pb.RestartInstanceRequest, context):
423428
self._orchestration_queue_set.discard(request.instanceId)
424429
self._state_waiters.pop(request.instanceId, None)
425430

426-
self._create_instance_internal(new_instance_id, name, original_input)
431+
self._create_instance_internal(
432+
new_instance_id, name, original_input, version=version)
427433

428434
self._logger.info(
429435
f"Restarted instance '{request.instanceId}' as '{new_instance_id}'")
@@ -712,10 +718,13 @@ def CompleteEntityTask(self, request: pb.EntityBatchResult, context):
712718
elif action.HasField("startNewOrchestration"):
713719
start_orch = action.startNewOrchestration
714720
orch_input = start_orch.input.value if start_orch.input else None
721+
orch_version = start_orch.version.value \
722+
if start_orch.HasField("version") else None
715723
instance_id = start_orch.instanceId or uuid.uuid4().hex
716724
try:
717725
self._create_instance_internal(
718-
instance_id, start_orch.name, orch_input
726+
instance_id, start_orch.name, orch_input,
727+
version=orch_version,
719728
)
720729
except Exception:
721730
self._logger.warning(
@@ -984,7 +993,8 @@ def _is_terminal_status_check(self, instance: OrchestrationInstance) -> bool:
984993
return self._is_terminal_status(instance.status)
985994

986995
def _create_instance_internal(self, instance_id: str, name: str,
987-
encoded_input: Optional[str] = None):
996+
encoded_input: Optional[str] = None,
997+
version: Optional[str] = None):
988998
"""Creates a new instance directly in internal state (no gRPC context needed)."""
989999
existing = self._instances.get(instance_id)
9901000
if existing:
@@ -1000,6 +1010,7 @@ def _create_instance_internal(self, instance_id: str, name: str,
10001010
instance_id=instance_id,
10011011
name=name,
10021012
status=pb.ORCHESTRATION_STATUS_PENDING,
1013+
version=version,
10031014
input=encoded_input,
10041015
created_at=now,
10051016
last_updated_at=now,
@@ -1008,7 +1019,8 @@ def _create_instance_internal(self, instance_id: str, name: str,
10081019
self._next_completion_token += 1
10091020

10101021
orchestrator_started = helpers.new_orchestrator_started_event(now)
1011-
execution_started = helpers.new_execution_started_event(name, instance_id, encoded_input)
1022+
execution_started = helpers.new_execution_started_event(
1023+
name, instance_id, encoded_input, version=version)
10121024
instance.pending_events.append(orchestrator_started)
10131025
instance.pending_events.append(execution_started)
10141026

@@ -1149,9 +1161,14 @@ def _process_complete_orchestration_action(self, instance: OrchestrationInstance
11491161
new_input = complete_action.result.value if complete_action.result else None
11501162
carryover_events = list(complete_action.carryoverEvents)
11511163

1164+
# Update version if a new version was specified
1165+
new_version = complete_action.newVersion.value \
1166+
if complete_action.HasField("newVersion") else instance.version
1167+
11521168
# Reset instance state
11531169
instance.history.clear()
11541170
instance.input = new_input
1171+
instance.version = new_version
11551172
instance.output = None
11561173
instance.failure_details = None
11571174
instance.status = pb.ORCHESTRATION_STATUS_PENDING
@@ -1166,7 +1183,8 @@ def _process_complete_orchestration_action(self, instance: OrchestrationInstance
11661183
now = datetime.now(timezone.utc)
11671184
orchestrator_started = helpers.new_orchestrator_started_event(now)
11681185
execution_started = helpers.new_execution_started_event(
1169-
instance.name, instance.instance_id, new_input
1186+
instance.name, instance.instance_id, new_input,
1187+
version=new_version,
11701188
)
11711189
instance.pending_events.append(orchestrator_started)
11721190
instance.pending_events.append(execution_started)
@@ -1241,9 +1259,12 @@ def _process_create_sub_orchestration_action(self, instance: OrchestrationInstan
12411259
name = create_sub_orch.name
12421260
sub_instance_id = create_sub_orch.instanceId
12431261
input_value = create_sub_orch.input.value if create_sub_orch.input else None
1262+
version = create_sub_orch.version.value \
1263+
if create_sub_orch.HasField("version") else None
12441264

12451265
# Add SubOrchestrationInstanceCreated event to history
1246-
event = helpers.new_sub_orchestration_created_event(task_id, name, sub_instance_id, input_value)
1266+
event = helpers.new_sub_orchestration_created_event(
1267+
task_id, name, sub_instance_id, input_value, version=version)
12471268
instance.history.append(event)
12481269

12491270
# Mark instance as running
@@ -1252,7 +1273,8 @@ def _process_create_sub_orchestration_action(self, instance: OrchestrationInstan
12521273

12531274
# Create the sub-orchestration directly via internal state
12541275
try:
1255-
self._create_instance_internal(sub_instance_id, name, input_value)
1276+
self._create_instance_internal(
1277+
sub_instance_id, name, input_value, version=version)
12561278

12571279
# Watch for sub-orchestration completion
12581280
self._watch_sub_orchestration(instance.instance_id, sub_instance_id, task_id)

0 commit comments

Comments
 (0)