Skip to content

Commit b274398

Browse files
committed
Feedback
1 parent 3e26a68 commit b274398

File tree

3 files changed

+67
-61
lines changed

3 files changed

+67
-61
lines changed

durabletask/client.py

Lines changed: 52 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ def raise_if_failed(self):
5757
self.failure_details)
5858

5959

60+
@dataclass
6061
class PurgeInstancesResult:
61-
def __init__(self, deleted_instance_count: int, is_complete: bool):
62-
self.deleted_instance_count = deleted_instance_count
63-
self.is_complete = is_complete
62+
deleted_instance_count: int
63+
is_complete: bool
6464

6565

6666
class OrchestrationFailedError(Exception):
@@ -183,7 +183,7 @@ def get_orchestration_state_by(self,
183183
_continuation_token: Optional[pb2.StringValue] = None
184184
) -> List[OrchestrationState]:
185185
if max_instance_count is None:
186-
# DTS backend does not behave well with max_instance_count = None, so we set to max 32-bit signed value
186+
# Some backends do not behave well with max_instance_count = None, so we set to max 32-bit signed value
187187
max_instance_count = (1 << 31) - 1
188188

189189
self._logger.info(f"Querying orchestration instances with filters - "
@@ -194,29 +194,31 @@ def get_orchestration_state_by(self,
194194
f"fetch_inputs_and_outputs={fetch_inputs_and_outputs}, "
195195
f"continuation_token={_continuation_token.value if _continuation_token else None}")
196196

197-
req = pb.QueryInstancesRequest(
198-
query=pb.InstanceQuery(
199-
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
200-
createdTimeFrom=helpers.new_timestamp(created_time_from) if created_time_from else None,
201-
createdTimeTo=helpers.new_timestamp(created_time_to) if created_time_to else None,
202-
maxInstanceCount=max_instance_count,
203-
fetchInputsAndOutputs=fetch_inputs_and_outputs,
204-
continuationToken=_continuation_token
205-
)
206-
)
207-
resp: pb.QueryInstancesResponse = self._stub.QueryInstances(req)
208-
states = [parse_orchestration_state(res) for res in resp.orchestrationState]
209-
# Check the value for continuationToken - none or "0" indicates that there are no more results.
210-
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
211-
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next list of instances...")
212-
states += self.get_orchestration_state_by(
213-
created_time_from,
214-
created_time_to,
215-
runtime_status,
216-
max_instance_count,
217-
fetch_inputs_and_outputs,
218-
_continuation_token=resp.continuationToken
197+
states = []
198+
199+
while True:
200+
req = pb.QueryInstancesRequest(
201+
query=pb.InstanceQuery(
202+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
203+
createdTimeFrom=helpers.new_timestamp(created_time_from) if created_time_from else None,
204+
createdTimeTo=helpers.new_timestamp(created_time_to) if created_time_to else None,
205+
maxInstanceCount=max_instance_count,
206+
fetchInputsAndOutputs=fetch_inputs_and_outputs,
207+
continuationToken=_continuation_token
208+
)
219209
)
210+
resp: pb.QueryInstancesResponse = self._stub.QueryInstances(req)
211+
states += [parse_orchestration_state(res) for res in resp.orchestrationState]
212+
# Check the value for continuationToken - none or "0" indicates that there are no more results.
213+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
214+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next list of instances...")
215+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
216+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
217+
break
218+
_continuation_token = resp.continuationToken
219+
else:
220+
break
221+
220222
states = [state for state in states if state is not None] # Filter out any None values
221223
return states
222224

@@ -377,28 +379,29 @@ def get_entities_by(self,
377379
f"include_state={include_state}, "
378380
f"include_transient={include_transient}, "
379381
f"page_size={page_size}")
380-
query_request = pb.QueryEntitiesRequest(
381-
query=pb.EntityQuery(
382-
instanceIdStartsWith=helpers.get_string_value(instance_id_starts_with),
383-
lastModifiedFrom=helpers.new_timestamp(last_modified_from) if last_modified_from else None,
384-
lastModifiedTo=helpers.new_timestamp(last_modified_to) if last_modified_to else None,
385-
includeState=include_state,
386-
includeTransient=include_transient,
387-
pageSize=helpers.get_int_value(page_size),
388-
continuationToken=_continuation_token
389-
)
390-
)
391-
resp: pb.QueryEntitiesResponse = self._stub.QueryEntities(query_request)
392-
entities = [EntityMetadata.from_entity_metadata(entity, query_request.query.includeState) for entity in resp.entities]
393-
if resp.continuationToken and resp.continuationToken.value != "0":
394-
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next page of entities...")
395-
entities += self.get_entities_by(
396-
instance_id_starts_with=instance_id_starts_with,
397-
last_modified_from=last_modified_from,
398-
last_modified_to=last_modified_to,
399-
include_state=include_state,
400-
include_transient=include_transient,
401-
page_size=page_size,
402-
_continuation_token=resp.continuationToken
382+
383+
entities = []
384+
385+
while True:
386+
query_request = pb.QueryEntitiesRequest(
387+
query=pb.EntityQuery(
388+
instanceIdStartsWith=helpers.get_string_value(instance_id_starts_with),
389+
lastModifiedFrom=helpers.new_timestamp(last_modified_from) if last_modified_from else None,
390+
lastModifiedTo=helpers.new_timestamp(last_modified_to) if last_modified_to else None,
391+
includeState=include_state,
392+
includeTransient=include_transient,
393+
pageSize=helpers.get_int_value(page_size),
394+
continuationToken=_continuation_token
395+
)
403396
)
397+
resp: pb.QueryEntitiesResponse = self._stub.QueryEntities(query_request)
398+
entities += [EntityMetadata.from_entity_metadata(entity, query_request.query.includeState) for entity in resp.entities]
399+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
400+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next page of entities...")
401+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
402+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
403+
break
404+
_continuation_token = resp.continuationToken
405+
else:
406+
break
404407
return entities

tests/durabletask-azuremanaged/test_dts_batch_actions.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,18 @@ def test_get_all_orchestration_states():
3737
assert this_orch.instance_id == id
3838

3939
assert all_orchestrations is not None
40-
assert len(all_orchestrations) > 1
41-
print(f"Received {len(all_orchestrations)} orchestrations")
42-
assert len([o for o in all_orchestrations if o.instance_id == id]) == 1
43-
orchestration_state = [o for o in all_orchestrations if o.instance_id == id][0]
40+
matching_orchestrations = [o for o in all_orchestrations if o.instance_id == id]
41+
assert len(matching_orchestrations) == 1
42+
orchestration_state = matching_orchestrations[0]
4443
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
4544
assert orchestration_state.serialized_input is None
4645
assert orchestration_state.serialized_output is None
4746
assert orchestration_state.failure_details is None
4847

4948
assert all_orchestrations_with_state is not None
50-
assert len(all_orchestrations_with_state) > 1
51-
print(f"Received {len(all_orchestrations_with_state)} orchestrations")
52-
assert len([o for o in all_orchestrations_with_state if o.instance_id == id]) == 1
53-
orchestration_state = [o for o in all_orchestrations_with_state if o.instance_id == id][0]
49+
matching_orchestrations = [o for o in all_orchestrations_with_state if o.instance_id == id]
50+
assert len(matching_orchestrations) == 1
51+
orchestration_state = matching_orchestrations[0]
5452
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
5553
assert orchestration_state.serialized_input == '"Hello"'
5654
assert orchestration_state.serialized_output == '"Complete"'

tests/durabletask/test_batch_actions.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11

2+
import pytest
23
from durabletask import client, task, worker
34

5+
# NOTE: These tests assume a sidecar process is running. Example command:
6+
# go install github.com/microsoft/durabletask-go@main
7+
# durabletask-go --port 4001
8+
pytestmark = pytest.mark.e2e
9+
410

511
def empty_orchestrator(ctx: task.OrchestrationContext, _):
612
return "Complete"
@@ -23,10 +29,9 @@ def test_get_all_orchestration_states():
2329
assert this_orch.instance_id == id
2430

2531
assert all_orchestrations is not None
26-
assert len(all_orchestrations) > 1
27-
print(f"Received {len(all_orchestrations)} orchestrations")
28-
assert len([o for o in all_orchestrations if o.instance_id == id]) == 1
29-
orchestration_state = [o for o in all_orchestrations if o.instance_id == id][0]
32+
matching_orchestrations = [o for o in all_orchestrations if o.instance_id == id]
33+
assert len(matching_orchestrations) == 1
34+
orchestration_state = matching_orchestrations[0]
3035
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
3136
assert orchestration_state.serialized_input == '"Hello"'
3237
assert orchestration_state.serialized_output == '"Complete"'

0 commit comments

Comments
 (0)