Convert the sync client into a wrapper of the async#31
Convert the sync client into a wrapper of the async#31
Conversation
Signed-off-by: Albert Callarisa <albert@diagrid.io>
|
nice effort and in general people use asyncio code so better to optimize for that path, and slowly consolidate |
There was a problem hiding this comment.
Pull request overview
This PR refactors the synchronous client to be a thin wrapper around the async client using anyio's start_blocking_portal() to bridge sync and async code. The core orchestration logic (OrchestrationStatus, OrchestrationState, OrchestrationFailedError) has been moved from the sync client to the async client, eliminating code duplication.
Changes:
- Moved OrchestrationStatus, OrchestrationState, OrchestrationFailedError, and related helper functions from durabletask/client.py to durabletask/aio/client.py
- Converted TaskHubGrpcClient to a wrapper that uses anyio's blocking portal to call the async client methods
- Updated tests to use AsyncMock instead of Mock and import OrchestrationStatus from durabletask.aio.client
- Added anyio>=4.0.0,<5 as a dependency
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| durabletask/client.py | Replaced the full sync implementation with a wrapper that uses anyio's blocking portal to call async methods |
| durabletask/aio/client.py | Moved OrchestrationStatus, OrchestrationState, OrchestrationFailedError and helper functions from sync client to centralize the implementation |
| tests/durabletask/test_orchestration_wait.py | Updated to use AsyncMock and access internal _async_client._stub for mocking |
| tests/durabletask/test_orchestration_e2e.py | Updated to import OrchestrationStatus from durabletask.aio.client |
| tests/durabletask/test_orchestration_e2e_async.py | Updated import to get OrchestrationStatus from durabletask.aio.client instead of durabletask.client |
| pyproject.toml | Added anyio>=4.0.0,<5 dependency |
Comments suppressed due to low confidence (1)
tests/durabletask/test_orchestration_wait.py:69
- The tests create a TaskHubGrpcClient but never call close() on it, which will leave the portal and async resources open. This can lead to resource leaks and warnings about unclosed resources. Consider using a context manager or explicitly calling close() at the end of the test, or use a pytest fixture with proper cleanup.
c = TaskHubGrpcClient()
c._async_client._stub = AsyncMock()
c._async_client._stub.WaitForInstanceStart.return_value = response
grpc_timeout = None if timeout is None else timeout
c.wait_for_orchestration_start(instance_id, timeout=grpc_timeout)
# Verify WaitForInstanceStart was called with timeout=None
c._async_client._stub.WaitForInstanceStart.assert_called_once()
_, kwargs = c._async_client._stub.WaitForInstanceStart.call_args
if timeout is None or timeout == 0:
assert kwargs.get("timeout") is None
else:
assert kwargs.get("timeout") == timeout
@pytest.mark.parametrize("timeout", [None, 0, 5])
def test_wait_for_orchestration_completion_timeout(timeout):
instance_id = "test-instance"
from durabletask.internal.orchestrator_service_pb2 import (
ORCHESTRATION_STATUS_COMPLETED,
GetInstanceResponse,
OrchestrationState,
)
response = GetInstanceResponse()
state = OrchestrationState()
state.instanceId = instance_id
state.orchestrationStatus = ORCHESTRATION_STATUS_COMPLETED
response.orchestrationState.CopyFrom(state)
c = TaskHubGrpcClient()
c._async_client._stub = AsyncMock()
c._async_client._stub.WaitForInstanceCompletion.return_value = response
grpc_timeout = None if timeout is None else timeout
c.wait_for_orchestration_completion(instance_id, timeout=grpc_timeout)
# Verify WaitForInstanceStart was called with timeout=None
c._async_client._stub.WaitForInstanceCompletion.assert_called_once()
_, kwargs = c._async_client._stub.WaitForInstanceCompletion.call_args
if timeout is None or timeout == 0:
assert kwargs.get("timeout") is None
else:
assert kwargs.get("timeout") == timeout
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
| from durabletask.aio.client import ( | ||
| AsyncTaskHubGrpcClient, | ||
| OrchestrationState, |
There was a problem hiding this comment.
The sync client no longer exports OrchestrationStatus and OrchestrationFailedError, which are part of the public API. Examples in the repository use client.OrchestrationStatus.COMPLETED, and this change will break existing code. These classes should be re-exported from durabletask.aio.client at the module level to maintain backward compatibility.
| OrchestrationState, | |
| OrchestrationState, | |
| OrchestrationStatus, | |
| OrchestrationFailedError, |
| # Best-effort cleanup in case the user didn't call close() or use context manager | ||
| try: | ||
| self.close() | ||
| except Exception: |
There was a problem hiding this comment.
Using del for cleanup with blocking operations is problematic. The del method may be called during interpreter shutdown when the event loop and other resources may be unavailable, potentially causing deadlocks or hangs. The close() method involves blocking calls to the portal which may not be safe during garbage collection. Consider removing del and relying on context managers or explicit close() calls, or at the minimum, add checks to ensure the portal is still valid before attempting cleanup.
| # Best-effort cleanup in case the user didn't call close() or use context manager | |
| try: | |
| self.close() | |
| except Exception: | |
| # Avoid blocking operations during garbage collection; just log if not properly closed. | |
| try: | |
| async_client = getattr(self, "_async_client", None) | |
| portal = getattr(self, "_portal", None) | |
| portal_cm = getattr(self, "_portal_cm", None) | |
| if async_client is not None or portal is not None or portal_cm is not None: | |
| logger = logging.getLogger(__name__) | |
| logger.warning( | |
| "TaskHubGrpcClient was not closed explicitly; resources may not be fully released. " | |
| "Use 'with TaskHubGrpcClient(...) as client:' or call 'client.close()' explicitly." | |
| ) | |
| except Exception: | |
| # Suppress all exceptions in destructor |
| self._portal_cm = start_blocking_portal() | ||
| self._portal = self._portal_cm.__enter__() | ||
| self._async_client = self._portal.call(self._create_async_client) |
There was a problem hiding this comment.
The portal setup should handle exceptions during async client creation. If _create_async_client() raises an exception, the portal will remain open but self._async_client will not be set. This will cause issues in the close() method. Wrap the async client creation in a try-except block and ensure proper portal cleanup on failure.
| self._portal_cm = start_blocking_portal() | |
| self._portal = self._portal_cm.__enter__() | |
| self._async_client = self._portal.call(self._create_async_client) | |
| # Initialize portal-related attributes to safe defaults | |
| self._portal_cm = None | |
| self._portal = None | |
| self._async_client = None | |
| # Set up the blocking portal and async client, ensuring cleanup on failure | |
| self._portal_cm = start_blocking_portal() | |
| self._portal = self._portal_cm.__enter__() | |
| try: | |
| self._async_client = self._portal.call(self._create_async_client) | |
| except Exception: | |
| # If async client creation fails, make sure to close the portal | |
| try: | |
| if self._portal_cm is not None: | |
| self._portal_cm.__exit__(None, None, None) | |
| finally: | |
| self._portal = None | |
| self._portal_cm = None | |
| raise |
| self._portal_cm = start_blocking_portal() | ||
| self._portal = self._portal_cm.__enter__() | ||
| self._async_client = self._portal.call(self._create_async_client) |
There was a problem hiding this comment.
The synchronous wrapper uses start_blocking_portal() which spawns a new thread with its own event loop. This design means every TaskHubGrpcClient instance creates its own background thread, which could lead to scalability issues if many clients are created. Consider documenting this behavior in the class docstring, and potentially adding a warning if multiple instances are created to help users understand the resource implications.
| @@ -42,7 +43,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): | |||
| assert state.name == task.get_name(empty_orchestrator) | |||
| assert state.instance_id == id | |||
| assert state.failure_details is None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_input is None | |||
| assert state.serialized_output is None | |||
| assert state.serialized_custom_status is None | |||
| @@ -73,7 +74,7 @@ def sequence(ctx: task.OrchestrationContext, start_val: int): | |||
| assert state is not None | |||
| assert state.name == task.get_name(sequence) | |||
| assert state.instance_id == id | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.failure_details is None | |||
| assert state.serialized_input == json.dumps(1) | |||
| assert state.serialized_output == json.dumps([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) | |||
| @@ -117,7 +118,7 @@ def orchestrator(ctx: task.OrchestrationContext, input: int): | |||
| assert state is not None | |||
| assert state.name == task.get_name(orchestrator) | |||
| assert state.instance_id == id | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_output == json.dumps("Kah-BOOOOM!!!") | |||
| assert state.failure_details is None | |||
| assert state.serialized_custom_status is None | |||
| @@ -157,7 +158,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int): | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
|
|
|||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.failure_details is None | |||
| assert activity_counter == 30 | |||
|
|
|||
| @@ -183,7 +184,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
|
|
|||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_output == json.dumps(["a", "b", "c"]) | |||
|
|
|||
|
|
|||
| @@ -211,7 +212,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
|
|
|||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| if raise_event: | |||
| assert state.serialized_output == json.dumps("approved") | |||
| else: | |||
| @@ -235,11 +236,11 @@ def orchestrator(ctx: task.OrchestrationContext, _): | |||
|
|
|||
| # Suspend the orchestration and wait for it to go into the SUSPENDED state | |||
| task_hub_client.suspend_orchestration(id) | |||
| while state.runtime_status == client.OrchestrationStatus.RUNNING: | |||
| while state.runtime_status == OrchestrationStatus.RUNNING: | |||
| time.sleep(0.1) | |||
| state = task_hub_client.get_orchestration_state(id) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.SUSPENDED | |||
| assert state.runtime_status == OrchestrationStatus.SUSPENDED | |||
|
|
|||
| # Raise an event to the orchestration and confirm that it does NOT complete | |||
| task_hub_client.raise_orchestration_event(id, "my_event", data=42) | |||
| @@ -253,7 +254,7 @@ def orchestrator(ctx: task.OrchestrationContext, _): | |||
| task_hub_client.resume_orchestration(id) | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_output == json.dumps(42) | |||
|
|
|||
|
|
|||
| @@ -271,12 +272,12 @@ def orchestrator(ctx: task.OrchestrationContext, _): | |||
| id = task_hub_client.schedule_new_orchestration(orchestrator) | |||
| state = task_hub_client.wait_for_orchestration_start(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.RUNNING | |||
| assert state.runtime_status == OrchestrationStatus.RUNNING | |||
|
|
|||
| task_hub_client.terminate_orchestration(id, output="some reason for termination") | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.TERMINATED | |||
| assert state.runtime_status == OrchestrationStatus.TERMINATED | |||
| assert state.serialized_output == json.dumps("some reason for termination") | |||
|
|
|||
|
|
|||
| @@ -320,7 +321,7 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int): | |||
| metadata = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30) | |||
|
|
|||
| assert metadata is not None | |||
| assert metadata.runtime_status == client.OrchestrationStatus.TERMINATED | |||
| assert metadata.runtime_status == OrchestrationStatus.TERMINATED | |||
| assert metadata.serialized_output == f'"{output}"' | |||
|
|
|||
| time.sleep(delay_time) | |||
| @@ -365,7 +366,7 @@ def orchestrator(ctx: task.OrchestrationContext, input: int): | |||
|
|
|||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_output == json.dumps(all_results) | |||
| assert state.serialized_input == json.dumps(4) | |||
| assert all_results == [1, 2, 3, 4, 5] | |||
| @@ -401,7 +402,7 @@ def orchestrator(ctx: task.OrchestrationContext, counter: int): | |||
|
|
|||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
|
|
|||
| output = json.loads(state.serialized_output) | |||
| # Should have called activity 3 times with input values 1, 2, 3 | |||
| @@ -459,7 +460,7 @@ def throw_activity_with_retry(ctx: task.ActivityContext, _): | |||
| id = task_hub_client.schedule_new_orchestration(parent_orchestrator_with_retry) | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.FAILED | |||
| assert state.runtime_status == OrchestrationStatus.FAILED | |||
| assert state.failure_details is not None | |||
| assert state.failure_details.error_type == "TaskFailedError" | |||
| assert state.failure_details.message.startswith("Sub-orchestration task #1 failed:") | |||
| @@ -500,7 +501,7 @@ def throw_activity(ctx: task.ActivityContext, _): | |||
| id = task_hub_client.schedule_new_orchestration(mock_orchestrator) | |||
| state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) | |||
| assert state is not None | |||
| assert state.runtime_status == client.OrchestrationStatus.FAILED | |||
| assert state.runtime_status == OrchestrationStatus.FAILED | |||
| assert state.failure_details is not None | |||
| assert state.failure_details.error_type == "TaskFailedError" | |||
| assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!") | |||
| @@ -525,7 +526,7 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): | |||
| assert state.name == task.get_name(empty_orchestrator) | |||
| assert state.instance_id == id | |||
| assert state.failure_details is None | |||
| assert state.runtime_status == client.OrchestrationStatus.COMPLETED | |||
| assert state.runtime_status == OrchestrationStatus.COMPLETED | |||
| assert state.serialized_input is None | |||
| assert state.serialized_output is None | |||
| assert state.serialized_custom_status == '"foobaz"' | |||
There was a problem hiding this comment.
The sync e2e tests create TaskHubGrpcClient instances but never call close() on them. Since the new implementation uses a portal with a background event loop thread, these resources will leak. Consider using context managers (with statements) or explicitly calling close() at the end of each test to properly clean up resources.
| async def _create_async_client(self) -> AsyncTaskHubGrpcClient: | ||
| return AsyncTaskHubGrpcClient( | ||
| host_address=self._host_address, | ||
| metadata=self._metadata, | ||
| log_handler=self._log_handler, | ||
| log_formatter=self._log_formatter, | ||
| secure_channel=self._secure_channel, | ||
| interceptors=self._interceptors, | ||
| channel_options=self._channel_options, | ||
| ) |
There was a problem hiding this comment.
The async method _create_async_client is called from init via portal.call(), but it doesn't need to be async since it only constructs an object and doesn't perform any awaitable operations. The AsyncTaskHubGrpcClient.init is synchronous. This async function could be made synchronous, simplifying the code and avoiding the overhead of scheduling it on the event loop.
| def close(self): | ||
| if self._async_client is not None: | ||
| try: | ||
| self._portal.call(self._async_client.aclose) | ||
| except Exception: | ||
| pass | ||
| self._async_client = None | ||
| if self._portal is not None: | ||
| try: | ||
| self._portal_cm.__exit__(None, None, None) | ||
| except Exception: | ||
| pass | ||
| self._portal = None | ||
| self._portal_cm = None |
There was a problem hiding this comment.
The close() method has a potential issue where it checks if self._async_client is not None but doesn't guard against the client being uninitialized. If init fails before setting self._async_client, or if close() is called multiple times, this could raise AttributeError. Consider adding a hasattr check or initializing self._async_client = None before the portal setup.
| return self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.schedule_new_orchestration, # type: ignore | ||
| orchestrator, | ||
| input=input, | ||
| instance_id=instance_id, | ||
| start_at=start_at, | ||
| reuse_id_policy=reuse_id_policy, | ||
| ) | ||
| ) | ||
|
|
||
| self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.") | ||
| res: pb.CreateInstanceResponse = self._stub.StartInstance(req) | ||
| return res.instanceId | ||
|
|
||
| def get_orchestration_state( | ||
| self, instance_id: str, *, fetch_payloads: bool = True | ||
| ) -> Optional[OrchestrationState]: | ||
| req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads) | ||
| res: pb.GetInstanceResponse = self._stub.GetInstance(req) | ||
| return new_orchestration_state(req.instanceId, res) | ||
| return self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.get_orchestration_state, # type: ignore | ||
| instance_id, | ||
| fetch_payloads=fetch_payloads, | ||
| ) | ||
| ) | ||
|
|
||
| def wait_for_orchestration_start( | ||
| self, instance_id: str, *, fetch_payloads: bool = False, timeout: int = 0 | ||
| ) -> Optional[OrchestrationState]: | ||
| req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads) | ||
| try: | ||
| grpc_timeout = None if timeout == 0 else timeout | ||
| self._logger.info( | ||
| f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to start." | ||
| return self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.wait_for_orchestration_start, # type: ignore | ||
| instance_id, | ||
| fetch_payloads=fetch_payloads, | ||
| timeout=timeout, | ||
| ) | ||
| res: pb.GetInstanceResponse = self._stub.WaitForInstanceStart(req, timeout=grpc_timeout) | ||
| return new_orchestration_state(req.instanceId, res) | ||
| except grpc.RpcError as rpc_error: | ||
| if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore | ||
| # Replace gRPC error with the built-in TimeoutError | ||
| raise TimeoutError("Timed-out waiting for the orchestration to start") | ||
| else: | ||
| raise | ||
| ) | ||
|
|
||
| def wait_for_orchestration_completion( | ||
| self, instance_id: str, *, fetch_payloads: bool = True, timeout: int = 0 | ||
| ) -> Optional[OrchestrationState]: | ||
| req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads) | ||
| try: | ||
| grpc_timeout = None if timeout == 0 else timeout | ||
| self._logger.info( | ||
| f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete." | ||
| return self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.wait_for_orchestration_completion, # type: ignore | ||
| instance_id, | ||
| fetch_payloads=fetch_payloads, | ||
| timeout=timeout, | ||
| ) | ||
| res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion( | ||
| req, timeout=grpc_timeout | ||
| ) | ||
| state = new_orchestration_state(req.instanceId, res) | ||
| if not state: | ||
| return None | ||
|
|
||
| if ( | ||
| state.runtime_status == OrchestrationStatus.FAILED | ||
| and state.failure_details is not None | ||
| ): | ||
| details = state.failure_details | ||
| self._logger.info( | ||
| f"Instance '{instance_id}' failed: [{details.error_type}] {details.message}" | ||
| ) | ||
| elif state.runtime_status == OrchestrationStatus.TERMINATED: | ||
| self._logger.info(f"Instance '{instance_id}' was terminated.") | ||
| elif state.runtime_status == OrchestrationStatus.COMPLETED: | ||
| self._logger.info(f"Instance '{instance_id}' completed.") | ||
|
|
||
| return state | ||
| except grpc.RpcError as rpc_error: | ||
| if rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: # type: ignore | ||
| # Replace gRPC error with the built-in TimeoutError | ||
| raise TimeoutError("Timed-out waiting for the orchestration to complete") | ||
| else: | ||
| raise | ||
| ) | ||
|
|
||
| def raise_orchestration_event( | ||
| self, instance_id: str, event_name: str, *, data: Optional[Any] = None | ||
| ): | ||
| req = pb.RaiseEventRequest( | ||
| instanceId=instance_id, | ||
| name=event_name, | ||
| input=wrappers_pb2.StringValue(value=shared.to_json(data)) if data else None, | ||
| self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.raise_orchestration_event, # type: ignore | ||
| instance_id, | ||
| event_name, | ||
| data=data, | ||
| ) | ||
| ) | ||
|
|
||
| self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.") | ||
| self._stub.RaiseEvent(req) | ||
|
|
||
| def terminate_orchestration( | ||
| self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True | ||
| ): | ||
| req = pb.TerminateRequest( | ||
| instanceId=instance_id, | ||
| output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None, | ||
| recursive=recursive, | ||
| self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.terminate_orchestration, # type: ignore | ||
| instance_id, | ||
| output=output, | ||
| recursive=recursive, | ||
| ) | ||
| ) | ||
|
|
||
| self._logger.info(f"Terminating instance '{instance_id}'.") | ||
| self._stub.TerminateInstance(req) | ||
|
|
||
| def suspend_orchestration(self, instance_id: str): | ||
| req = pb.SuspendRequest(instanceId=instance_id) | ||
| self._logger.info(f"Suspending instance '{instance_id}'.") | ||
| self._stub.SuspendInstance(req) | ||
| self._portal.call(self._async_client.suspend_orchestration, instance_id) # type: ignore | ||
|
|
||
| def resume_orchestration(self, instance_id: str): | ||
| req = pb.ResumeRequest(instanceId=instance_id) | ||
| self._logger.info(f"Resuming instance '{instance_id}'.") | ||
| self._stub.ResumeInstance(req) | ||
| self._portal.call(self._async_client.resume_orchestration, instance_id) # type: ignore | ||
|
|
||
| def purge_orchestration(self, instance_id: str, recursive: bool = True): | ||
| req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive) | ||
| self._logger.info(f"Purging instance '{instance_id}'.") | ||
| self._stub.PurgeInstances(req) | ||
| self._portal.call( # type: ignore | ||
| partial( | ||
| self._async_client.purge_orchestration, # type: ignore | ||
| instance_id, | ||
| recursive=recursive, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
There are numerous type: ignore comments throughout the wrapper methods. This suggests type checking issues with the portal.call() pattern. Consider adding proper type hints to _portal and _async_client attributes, or using typing.cast() where appropriate to make the types explicit, which would improve code maintainability and IDE support.
This PR introduces
anyioto reduce duplication between the clients. The sync client is now just a wrapper, using the async client behind the scenes.I kept all the functions explicitly defined in the sync client so the code editors can show the tips to users. We could generate those functions dynamically and create a
.pyifile, but I think it's manageable and simpler to duplicate them.Opening as draft to get feedback on this approach.