-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathactivity_sequence.py
More file actions
52 lines (40 loc) · 2.27 KB
/
activity_sequence.py
File metadata and controls
52 lines (40 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from azure.identity import DefaultAzureCredential
from durabletask import client, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'
def sequence(ctx: task.OrchestrationContext, _):
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
# call "hello" activity function in a sequence
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')
# return an array of results
return [result1, result2, result3]
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(sequence)
w.add_activity(hello)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(sequence)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')