Skip to content

Commit aa035cb

Browse files
Copilotandystaples
andcommitted
Add comprehensive batch action tests and clean_entity_storage method
Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com>
1 parent c10251c commit aa035cb

File tree

3 files changed

+726
-7
lines changed

3 files changed

+726
-7
lines changed

durabletask/client.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import uuid
66
from dataclasses import dataclass
7-
from datetime import datetime, timedelta, timezone
7+
from datetime import datetime, timezone
88
from enum import Enum
99
from typing import Any, List, Optional, Sequence, TypeVar, Union
1010

@@ -63,6 +63,12 @@ def __init__(self, deleted_instance_count: int, is_complete: bool):
6363
self.is_complete = is_complete
6464

6565

66+
class CleanEntityStorageResult:
67+
def __init__(self, empty_entities_removed: int, orphaned_locks_released: int):
68+
self.empty_entities_removed = empty_entities_removed
69+
self.orphaned_locks_released = orphaned_locks_released
70+
71+
6672
class OrchestrationFailedError(Exception):
6773
def __init__(self, message: str, failure_details: task.FailureDetails):
6874
super().__init__(message)
@@ -356,7 +362,7 @@ def get_entities_by(self,
356362
page_size: Optional[int] = None,
357363
_continuation_token: Optional[pb2.StringValue] = None
358364
) -> List[EntityMetadata]:
359-
self._logger.info(f"Getting entities")
365+
self._logger.info("Getting entities")
360366
query_request = pb.QueryEntitiesRequest(
361367
query=pb.EntityQuery(
362368
instanceIdStartsWith=helpers.get_string_value(instance_id_starts_with),
@@ -382,3 +388,30 @@ def get_entities_by(self,
382388
_continuation_token=resp.continuationToken
383389
)
384390
return entities
391+
392+
def clean_entity_storage(self,
393+
remove_empty_entities: bool = True,
394+
release_orphaned_locks: bool = True,
395+
_continuation_token: Optional[pb2.StringValue] = None
396+
) -> CleanEntityStorageResult:
397+
self._logger.info("Cleaning entity storage")
398+
req = pb.CleanEntityStorageRequest(
399+
removeEmptyEntities=remove_empty_entities,
400+
releaseOrphanedLocks=release_orphaned_locks,
401+
continuationToken=_continuation_token
402+
)
403+
resp: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
404+
empty_entities_removed = resp.emptyEntitiesRemoved
405+
orphaned_locks_released = resp.orphanedLocksReleased
406+
407+
if resp.continuationToken and resp.continuationToken.value != "0":
408+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, cleaning next page...")
409+
next_result = self.clean_entity_storage(
410+
remove_empty_entities=remove_empty_entities,
411+
release_orphaned_locks=release_orphaned_locks,
412+
_continuation_token=resp.continuationToken
413+
)
414+
empty_entities_removed += next_result.empty_entities_removed
415+
orphaned_locks_released += next_result.orphaned_locks_released
416+
417+
return CleanEntityStorageResult(empty_entities_removed, orphaned_locks_released)

0 commit comments

Comments
 (0)