-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathhuman_interaction.py
More file actions
152 lines (121 loc) · 6.53 KB
/
human_interaction.py
File metadata and controls
152 lines (121 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""End-to-end sample that demonstrates how to configure an orchestrator
that waits for an "approval" event before proceding to the next step. If
the approval isn't received within a specified timeout, the order that is
represented by the orchestration is automatically cancelled."""
import os
import threading
import time
from collections import namedtuple
from dataclasses import dataclass
from datetime import timedelta
from azure.identity import DefaultAzureCredential
from durabletask import client, task, worker
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
@dataclass
class Order:
"""Represents a purchase order"""
Cost: float
Product: str
Quantity: int
def __str__(self):
return f'{self.Product} ({self.Quantity})'
def send_approval_request(_: task.ActivityContext, order: Order) -> None:
"""Activity function that sends an approval request to the manager"""
time.sleep(5)
print(f'*** Sending approval request for order: {order}')
def place_order(_: task.ActivityContext, order: Order) -> None:
"""Activity function that places an order"""
print(f'*** Placing order: {order}')
def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
"""Orchestrator function that represents a purchase order workflow"""
# Orders under $1000 are auto-approved
if order.Cost < 1000:
return "Auto-approved"
# Orders of $1000 or more require manager approval
yield ctx.call_activity(send_approval_request, input=order)
# Approvals must be received within 24 hours or they will be canceled.
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield task.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Cancelled"
# The order was approved
yield ctx.call_activity(place_order, input=order)
approval_details = approval_event.get_result()
return f"Approved by '{approval_details.approver}'"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Order purchasing workflow demo.")
parser.add_argument("--cost", type=int, default=2000, help="Cost of the order")
parser.add_argument("--approver", type=str, default="Me", help="Approver name")
parser.add_argument("--timeout", type=int, default=60, help="Timeout in seconds")
parser.add_argument("--local", action="store_true", help="Use local worker instead of DurableTaskScheduler")
args = parser.parse_args()
if args.local:
# Use local worker (original implementation)
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(purchase_order_workflow)
w.add_activity(send_approval_request)
w.add_activity(place_order)
w.start()
c = client.TaskHubGrpcClient()
# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)
instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order)
def prompt_for_approval():
input("Press [ENTER] to approve the order...\n")
approval_event = namedtuple("Approval", ["approver"])(args.approver)
c.raise_orchestration_event(instance_id, "approval_received", data=approval_event)
# Prompt the user for approval on a background thread
threading.Thread(target=prompt_for_approval, daemon=True).start()
# Wait for the orchestration to complete
try:
state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2)
if not state:
print("Workflow not found!") # not expected
elif state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
else:
state.raise_if_failed() # raises an exception
except TimeoutError:
print("*** Orchestration timed out!")
else:
# Use DurableTaskScheduler
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(purchase_order_workflow)
w.add_activity(send_approval_request)
w.add_activity(place_order)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
# Start a purchase order workflow using the user input
order = Order(args.cost, "MyProduct", 1)
instance_id = c.schedule_new_orchestration(purchase_order_workflow, input=order)
def prompt_for_approval():
input("Press [ENTER] to approve the order...\n")
approval_event = namedtuple("Approval", ["approver"])(args.approver)
c.raise_orchestration_event(instance_id, "approval_received", data=approval_event)
# Prompt the user for approval on a background thread
threading.Thread(target=prompt_for_approval, daemon=True).start()
# Wait for the orchestration to complete
try:
state = c.wait_for_orchestration_completion(instance_id, timeout=args.timeout + 2)
if not state:
print("Workflow not found!") # not expected
elif state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
else:
state.raise_if_failed() # raises an exception
except TimeoutError:
print("*** Orchestration timed out!")