Skip to content

Commit 9fcdc6d

Browse files
Copilotandystaples
andauthored
Merge base branch and align test structure for batch actions (#112)
* Initial plan * Add comprehensive batch action tests and clean_entity_storage method Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com> * Merge base branch and resolve conflicts Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com> * Address PR review comments on test improvements Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com>
1 parent 1791b65 commit 9fcdc6d

File tree

2 files changed

+393
-1
lines changed

2 files changed

+393
-1
lines changed

durabletask/client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ class PurgeInstancesResult:
6363
is_complete: bool
6464

6565

66+
@dataclass
67+
class CleanEntityStorageResult:
68+
empty_entities_removed: int
69+
orphaned_locks_released: int
70+
71+
6672
class OrchestrationFailedError(Exception):
6773
def __init__(self, message: str, failure_details: task.FailureDetails):
6874
super().__init__(message)
@@ -406,3 +412,34 @@ def get_entities_by(self,
406412
else:
407413
break
408414
return entities
415+
416+
def clean_entity_storage(self,
417+
remove_empty_entities: bool = True,
418+
release_orphaned_locks: bool = True,
419+
_continuation_token: Optional[pb2.StringValue] = None
420+
) -> CleanEntityStorageResult:
421+
self._logger.info("Cleaning entity storage")
422+
423+
empty_entities_removed = 0
424+
orphaned_locks_released = 0
425+
426+
while True:
427+
req = pb.CleanEntityStorageRequest(
428+
removeEmptyEntities=remove_empty_entities,
429+
releaseOrphanedLocks=release_orphaned_locks,
430+
continuationToken=_continuation_token
431+
)
432+
resp: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
433+
empty_entities_removed += resp.emptyEntitiesRemoved
434+
orphaned_locks_released += resp.orphanedLocksReleased
435+
436+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
437+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, cleaning next page...")
438+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
439+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
440+
break
441+
_continuation_token = resp.continuationToken
442+
else:
443+
break
444+
445+
return CleanEntityStorageResult(empty_entities_removed, orphaned_locks_released)

0 commit comments

Comments
 (0)