Skip to content

Commit 1179a95

Browse files
committed
initial commit
1 parent 0e09bb8 commit 1179a95

File tree

8 files changed

+776
-270
lines changed

8 files changed

+776
-270
lines changed

durabletask/client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,26 @@ def resume_orchestration(self, instance_id: str):
225225
self._logger.info(f"Resuming instance '{instance_id}'.")
226226
self._stub.ResumeInstance(req)
227227

228+
def restart_orchestration(self, instance_id: str, *,
229+
restart_with_new_instance_id: bool = False) -> str:
230+
"""Restarts an existing orchestration instance.
231+
232+
Args:
233+
instance_id: The ID of the orchestration instance to restart.
234+
restart_with_new_instance_id: If True, the restarted orchestration will use a new instance ID.
235+
If False (default), the restarted orchestration will reuse the same instance ID.
236+
237+
Returns:
238+
The instance ID of the restarted orchestration.
239+
"""
240+
req = pb.RestartInstanceRequest(
241+
instanceId=instance_id,
242+
restartWithNewInstanceId=restart_with_new_instance_id)
243+
244+
self._logger.info(f"Restarting instance '{instance_id}'.")
245+
res: pb.RestartInstanceResponse = self._stub.RestartInstance(req)
246+
return res.instanceId
247+
228248
def purge_orchestration(self, instance_id: str, recursive: bool = True):
229249
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
230250
self._logger.info(f"Purging instance '{instance_id}'.")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
443b333f4f65a438dc9eb4f090560d232afec4b7
22
fd9369c6a03d6af4e95285e432b7c4e943c06970
3+
026329c53fe6363985655857b9ca848ec7238bd2

durabletask/internal/orchestrator_service_pb2.py

Lines changed: 273 additions & 221 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

durabletask/internal/orchestrator_service_pb2.pyi

Lines changed: 208 additions & 41 deletions
Large diffs are not rendered by default.

durabletask/internal/orchestrator_service_pb2_grpc.py

Lines changed: 135 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
from durabletask.internal import orchestrator_service_pb2 as durabletask_dot_internal_dot_orchestrator__service__pb2
77
from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
88

9-
GRPC_GENERATED_VERSION = '1.65.4'
9+
GRPC_GENERATED_VERSION = '1.78.0'
1010
GRPC_VERSION = grpc.__version__
11-
EXPECTED_ERROR_RELEASE = '1.66.0'
12-
SCHEDULED_RELEASE_DATE = 'August 6, 2024'
1311
_version_not_supported = False
1412

1513
try:
@@ -19,15 +17,12 @@
1917
_version_not_supported = True
2018

2119
if _version_not_supported:
22-
warnings.warn(
20+
raise RuntimeError(
2321
f'The grpc package installed is at version {GRPC_VERSION},'
24-
+ f' but the generated code in durabletask/internal/orchestrator_service_pb2_grpc.py depends on'
22+
+ ' but the generated code in durabletask/internal/orchestrator_service_pb2_grpc.py depends on'
2523
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
2624
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
2725
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
28-
+ f' This warning will become an error in {EXPECTED_ERROR_RELEASE},'
29-
+ f' scheduled for release on {SCHEDULED_RELEASE_DATE}.',
30-
RuntimeWarning
3126
)
3227

3328

@@ -60,6 +55,11 @@ def __init__(self, channel):
6055
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceRequest.SerializeToString,
6156
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceResponse.FromString,
6257
_registered_method=True)
58+
self.RestartInstance = channel.unary_unary(
59+
'/TaskHubSidecarService/RestartInstance',
60+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.SerializeToString,
61+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.FromString,
62+
_registered_method=True)
6363
self.WaitForInstanceStart = channel.unary_unary(
6464
'/TaskHubSidecarService/WaitForInstanceStart',
6565
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceRequest.SerializeToString,
@@ -95,6 +95,11 @@ def __init__(self, channel):
9595
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesRequest.SerializeToString,
9696
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesResponse.FromString,
9797
_registered_method=True)
98+
self.ListInstanceIds = channel.unary_unary(
99+
'/TaskHubSidecarService/ListInstanceIds',
100+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.SerializeToString,
101+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.FromString,
102+
_registered_method=True)
98103
self.PurgeInstances = channel.unary_unary(
99104
'/TaskHubSidecarService/PurgeInstances',
100105
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.PurgeInstancesRequest.SerializeToString,
@@ -170,6 +175,11 @@ def __init__(self, channel):
170175
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskRequest.SerializeToString,
171176
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskResponse.FromString,
172177
_registered_method=True)
178+
self.SkipGracefulOrchestrationTerminations = channel.unary_unary(
179+
'/TaskHubSidecarService/SkipGracefulOrchestrationTerminations',
180+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.SerializeToString,
181+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.FromString,
182+
_registered_method=True)
173183

174184

175185
class TaskHubSidecarServiceServicer(object):
@@ -203,6 +213,13 @@ def RewindInstance(self, request, context):
203213
context.set_details('Method not implemented!')
204214
raise NotImplementedError('Method not implemented!')
205215

216+
def RestartInstance(self, request, context):
217+
"""Restarts an orchestration instance.
218+
"""
219+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
220+
context.set_details('Method not implemented!')
221+
raise NotImplementedError('Method not implemented!')
222+
206223
def WaitForInstanceStart(self, request, context):
207224
"""Waits for an orchestration instance to reach a running or completion state.
208225
"""
@@ -253,6 +270,12 @@ def QueryInstances(self, request, context):
253270
context.set_details('Method not implemented!')
254271
raise NotImplementedError('Method not implemented!')
255272

273+
def ListInstanceIds(self, request, context):
274+
"""Missing associated documentation comment in .proto file."""
275+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
276+
context.set_details('Method not implemented!')
277+
raise NotImplementedError('Method not implemented!')
278+
256279
def PurgeInstances(self, request, context):
257280
"""Missing associated documentation comment in .proto file."""
258281
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
@@ -353,6 +376,14 @@ def AbandonTaskEntityWorkItem(self, request, context):
353376
context.set_details('Method not implemented!')
354377
raise NotImplementedError('Method not implemented!')
355378

379+
def SkipGracefulOrchestrationTerminations(self, request, context):
380+
""""Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
381+
Note that a maximum of 500 orchestrations can be terminated at a time using this method.
382+
"""
383+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
384+
context.set_details('Method not implemented!')
385+
raise NotImplementedError('Method not implemented!')
386+
356387

357388
def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
358389
rpc_method_handlers = {
@@ -376,6 +407,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
376407
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceRequest.FromString,
377408
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceResponse.SerializeToString,
378409
),
410+
'RestartInstance': grpc.unary_unary_rpc_method_handler(
411+
servicer.RestartInstance,
412+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.FromString,
413+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.SerializeToString,
414+
),
379415
'WaitForInstanceStart': grpc.unary_unary_rpc_method_handler(
380416
servicer.WaitForInstanceStart,
381417
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceRequest.FromString,
@@ -411,6 +447,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
411447
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesRequest.FromString,
412448
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesResponse.SerializeToString,
413449
),
450+
'ListInstanceIds': grpc.unary_unary_rpc_method_handler(
451+
servicer.ListInstanceIds,
452+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.FromString,
453+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.SerializeToString,
454+
),
414455
'PurgeInstances': grpc.unary_unary_rpc_method_handler(
415456
servicer.PurgeInstances,
416457
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.PurgeInstancesRequest.FromString,
@@ -486,6 +527,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
486527
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskRequest.FromString,
487528
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskResponse.SerializeToString,
488529
),
530+
'SkipGracefulOrchestrationTerminations': grpc.unary_unary_rpc_method_handler(
531+
servicer.SkipGracefulOrchestrationTerminations,
532+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.FromString,
533+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.SerializeToString,
534+
),
489535
}
490536
generic_handler = grpc.method_handlers_generic_handler(
491537
'TaskHubSidecarService', rpc_method_handlers)
@@ -605,6 +651,33 @@ def RewindInstance(request,
605651
metadata,
606652
_registered_method=True)
607653

654+
@staticmethod
655+
def RestartInstance(request,
656+
target,
657+
options=(),
658+
channel_credentials=None,
659+
call_credentials=None,
660+
insecure=False,
661+
compression=None,
662+
wait_for_ready=None,
663+
timeout=None,
664+
metadata=None):
665+
return grpc.experimental.unary_unary(
666+
request,
667+
target,
668+
'/TaskHubSidecarService/RestartInstance',
669+
durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.SerializeToString,
670+
durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.FromString,
671+
options,
672+
channel_credentials,
673+
insecure,
674+
call_credentials,
675+
compression,
676+
wait_for_ready,
677+
timeout,
678+
metadata,
679+
_registered_method=True)
680+
608681
@staticmethod
609682
def WaitForInstanceStart(request,
610683
target,
@@ -794,6 +867,33 @@ def QueryInstances(request,
794867
metadata,
795868
_registered_method=True)
796869

870+
@staticmethod
871+
def ListInstanceIds(request,
872+
target,
873+
options=(),
874+
channel_credentials=None,
875+
call_credentials=None,
876+
insecure=False,
877+
compression=None,
878+
wait_for_ready=None,
879+
timeout=None,
880+
metadata=None):
881+
return grpc.experimental.unary_unary(
882+
request,
883+
target,
884+
'/TaskHubSidecarService/ListInstanceIds',
885+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.SerializeToString,
886+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.FromString,
887+
options,
888+
channel_credentials,
889+
insecure,
890+
call_credentials,
891+
compression,
892+
wait_for_ready,
893+
timeout,
894+
metadata,
895+
_registered_method=True)
896+
797897
@staticmethod
798898
def PurgeInstances(request,
799899
target,
@@ -1198,3 +1298,30 @@ def AbandonTaskEntityWorkItem(request,
11981298
timeout,
11991299
metadata,
12001300
_registered_method=True)
1301+
1302+
@staticmethod
1303+
def SkipGracefulOrchestrationTerminations(request,
1304+
target,
1305+
options=(),
1306+
channel_credentials=None,
1307+
call_credentials=None,
1308+
insecure=False,
1309+
compression=None,
1310+
wait_for_ready=None,
1311+
timeout=None,
1312+
metadata=None):
1313+
return grpc.experimental.unary_unary(
1314+
request,
1315+
target,
1316+
'/TaskHubSidecarService/SkipGracefulOrchestrationTerminations',
1317+
durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.SerializeToString,
1318+
durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.FromString,
1319+
options,
1320+
channel_credentials,
1321+
insecure,
1322+
call_credentials,
1323+
compression,
1324+
wait_for_ready,
1325+
timeout,
1326+
metadata,
1327+
_registered_method=True)

durabletask/internal/proto_task_hub_sidecar_service_stub.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class ProtoTaskHubSidecarServiceStub(Protocol):
99
StartInstance: Callable[..., Any]
1010
GetInstance: Callable[..., Any]
1111
RewindInstance: Callable[..., Any]
12+
RestartInstance: Callable[..., Any]
1213
WaitForInstanceStart: Callable[..., Any]
1314
WaitForInstanceCompletion: Callable[..., Any]
1415
RaiseEvent: Callable[..., Any]

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@
2222
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
2323

2424

25+
def _get_credential():
26+
"""Returns DefaultAzureCredential if endpoint is https, otherwise None (for emulator)."""
27+
if endpoint.startswith("https://"):
28+
from azure.identity import DefaultAzureCredential
29+
return DefaultAzureCredential()
30+
return None
31+
32+
2533
def test_empty_orchestration():
2634

2735
invoked = False
@@ -371,6 +379,75 @@ def child(ctx: task.OrchestrationContext, _):
371379
assert state is None
372380

373381

382+
def test_restart_with_same_instance_id():
383+
def orchestrator(ctx: task.OrchestrationContext, _):
384+
result = yield ctx.call_activity(say_hello, input="World")
385+
return result
386+
387+
def say_hello(ctx: task.ActivityContext, input: str):
388+
return f"Hello, {input}!"
389+
390+
credential = _get_credential()
391+
392+
# Start a worker, which will connect to the sidecar in a background thread
393+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
394+
taskhub=taskhub_name, token_credential=credential) as w:
395+
w.add_orchestrator(orchestrator)
396+
w.add_activity(say_hello)
397+
w.start()
398+
399+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
400+
taskhub=taskhub_name, token_credential=credential)
401+
id = task_hub_client.schedule_new_orchestration(orchestrator)
402+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
403+
assert state is not None
404+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
405+
assert state.serialized_output == json.dumps("Hello, World!")
406+
407+
# Restart the orchestration with the same instance ID
408+
restarted_id = task_hub_client.restart_orchestration(id)
409+
assert restarted_id == id
410+
411+
state = task_hub_client.wait_for_orchestration_completion(restarted_id, timeout=30)
412+
assert state is not None
413+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
414+
assert state.serialized_output == json.dumps("Hello, World!")
415+
416+
417+
def test_restart_with_new_instance_id():
418+
def orchestrator(ctx: task.OrchestrationContext, _):
419+
result = yield ctx.call_activity(say_hello, input="World")
420+
return result
421+
422+
def say_hello(ctx: task.ActivityContext, input: str):
423+
return f"Hello, {input}!"
424+
425+
credential = _get_credential()
426+
427+
# Start a worker, which will connect to the sidecar in a background thread
428+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
429+
taskhub=taskhub_name, token_credential=credential) as w:
430+
w.add_orchestrator(orchestrator)
431+
w.add_activity(say_hello)
432+
w.start()
433+
434+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
435+
taskhub=taskhub_name, token_credential=credential)
436+
id = task_hub_client.schedule_new_orchestration(orchestrator)
437+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
438+
assert state is not None
439+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
440+
441+
# Restart the orchestration with a new instance ID
442+
restarted_id = task_hub_client.restart_orchestration(id, restart_with_new_instance_id=True)
443+
assert restarted_id != id
444+
445+
state = task_hub_client.wait_for_orchestration_completion(restarted_id, timeout=30)
446+
assert state is not None
447+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
448+
assert state.serialized_output == json.dumps("Hello, World!")
449+
450+
374451
# def test_continue_as_new():
375452
# all_results = []
376453

0 commit comments

Comments
 (0)