From 9af59093545c08caf48a3a0359a75f669905bf31 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 16 Jan 2025 16:17:39 -0500 Subject: [PATCH 1/5] fix: retry logic in tenacity utils --- hatchet_sdk/clients/rest/tenacity_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hatchet_sdk/clients/rest/tenacity_utils.py b/hatchet_sdk/clients/rest/tenacity_utils.py index 55fe6d18..377266a1 100644 --- a/hatchet_sdk/clients/rest/tenacity_utils.py +++ b/hatchet_sdk/clients/rest/tenacity_utils.py @@ -28,8 +28,8 @@ def tenacity_alert_retry(retry_state: tenacity.RetryCallState) -> None: def tenacity_should_retry(ex: Exception) -> bool: - if isinstance(ex, grpc.aio.AioRpcError): - if ex.code in [ + if isinstance(ex, (grpc.aio.AioRpcError, grpc.RpcError)): + if ex.code() in [ grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.NOT_FOUND, ]: From 14a24e65ca771865ace58006128565ef800caa4f Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 16 Jan 2025 16:17:45 -0500 Subject: [PATCH 2/5] drive-by: ignore python version --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6b15a2af..a8fca96c 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,4 @@ cython_debug/ #.idea/ openapitools.json +.python-version From ab75228303410a05e22791b1a746a0e24b89f952 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 16 Jan 2025 16:21:57 -0500 Subject: [PATCH 3/5] fix: remaining error handling issues --- hatchet_sdk/clients/admin.py | 207 +++++++++++++++------------------- hatchet_sdk/clients/events.py | 15 +-- 2 files changed, 95 insertions(+), 127 deletions(-) diff --git a/hatchet_sdk/clients/admin.py b/hatchet_sdk/clients/admin.py index 02fdeb56..65cbebb4 100644 --- a/hatchet_sdk/clients/admin.py +++ b/hatchet_sdk/clients/admin.py @@ -256,7 +256,7 @@ async def run_workflow( if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise DedupeViolationErr(e.details()) - raise ValueError(f"gRPC error: {e}") + raise e @tenacity_retry async def run_workflows( @@ -266,56 +266,49 @@ async def run_workflows( ) -> List[WorkflowRunRef]: if len(workflows) == 0: raise ValueError("No workflows to run") - try: - if not self.pooled_workflow_listener: - self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) - namespace = self.namespace + if not self.pooled_workflow_listener: + self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) - if ( - options is not None - and "namespace" in options - and options["namespace"] is not None - ): - namespace = options["namespace"] - del options["namespace"] + namespace = self.namespace - workflow_run_requests: TriggerWorkflowRequest = [] + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): + namespace = options["namespace"] + del options["namespace"] - for workflow in workflows: - workflow_name = workflow["workflow_name"] - input_data = workflow["input"] - options = workflow["options"] + workflow_run_requests: TriggerWorkflowRequest = [] - if namespace != "" and not workflow_name.startswith(self.namespace): - workflow_name = f"{namespace}{workflow_name}" + for workflow in workflows: + workflow_name = workflow["workflow_name"] + input_data = workflow["input"] + options = workflow["options"] - # Prepare and trigger workflow for each workflow name and input - request = self._prepare_workflow_request( - workflow_name, input_data, options - ) - workflow_run_requests.append(request) + if namespace != "" and not workflow_name.startswith(self.namespace): + workflow_name = f"{namespace}{workflow_name}" - request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests) + # Prepare and trigger workflow for each workflow name and input + request = self._prepare_workflow_request(workflow_name, input_data, options) + workflow_run_requests.append(request) - resp: BulkTriggerWorkflowResponse = ( - await self.aio_client.BulkTriggerWorkflow( - request, - metadata=get_metadata(self.token), - ) - ) + request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests) - return [ - WorkflowRunRef( - workflow_run_id=workflow_run_id, - workflow_listener=self.pooled_workflow_listener, - workflow_run_event_listener=self.listener_client, - ) - for workflow_run_id in resp.workflow_run_ids - ] + resp: BulkTriggerWorkflowResponse = await self.aio_client.BulkTriggerWorkflow( + request, + metadata=get_metadata(self.token), + ) - except grpc.RpcError as e: - raise ValueError(f"gRPC error: {e}") + return [ + WorkflowRunRef( + workflow_run_id=workflow_run_id, + workflow_listener=self.pooled_workflow_listener, + workflow_run_event_listener=self.listener_client, + ) + for workflow_run_id in resp.workflow_run_ids + ] @tenacity_retry async def put_workflow( @@ -324,15 +317,12 @@ async def put_workflow( workflow: CreateWorkflowVersionOpts | WorkflowMeta, overrides: CreateWorkflowVersionOpts | None = None, ) -> WorkflowVersion: - try: - opts = self._prepare_put_workflow_request(name, workflow, overrides) + opts = self._prepare_put_workflow_request(name, workflow, overrides) - return await self.aio_client.PutWorkflow( - opts, - metadata=get_metadata(self.token), - ) - except grpc.RpcError as e: - raise ValueError(f"Could not put workflow: {e}") + return await self.aio_client.PutWorkflow( + opts, + metadata=get_metadata(self.token), + ) @tenacity_retry async def put_rate_limit( @@ -341,17 +331,14 @@ async def put_rate_limit( limit: int, duration: RateLimitDuration = RateLimitDuration.SECOND, ): - try: - await self.aio_client.PutRateLimit( - PutRateLimitRequest( - key=key, - limit=limit, - duration=duration, - ), - metadata=get_metadata(self.token), - ) - except grpc.RpcError as e: - raise ValueError(f"Could not put rate limit: {e}") + await self.aio_client.PutRateLimit( + PutRateLimitRequest( + key=key, + limit=limit, + duration=duration, + ), + metadata=get_metadata(self.token), + ) @tenacity_retry async def schedule_workflow( @@ -383,11 +370,11 @@ async def schedule_workflow( request, metadata=get_metadata(self.token), ) - except grpc.RpcError as e: + except (grpc.aio.AioRpcError, grpc.RpcError) as e: if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise DedupeViolationErr(e.details()) - raise ValueError(f"gRPC error: {e}") + raise e class AdminClient(AdminClientBase): @@ -408,17 +395,14 @@ def put_workflow( workflow: CreateWorkflowVersionOpts | WorkflowMeta, overrides: CreateWorkflowVersionOpts | None = None, ) -> WorkflowVersion: - try: - opts = self._prepare_put_workflow_request(name, workflow, overrides) + opts = self._prepare_put_workflow_request(name, workflow, overrides) - resp: WorkflowVersion = self.client.PutWorkflow( - opts, - metadata=get_metadata(self.token), - ) + resp: WorkflowVersion = self.client.PutWorkflow( + opts, + metadata=get_metadata(self.token), + ) - return resp - except grpc.RpcError as e: - raise ValueError(f"Could not put workflow: {e}") + return resp @tenacity_retry def put_rate_limit( @@ -427,17 +411,14 @@ def put_rate_limit( limit: int, duration: Union[RateLimitDuration.Value, str] = RateLimitDuration.SECOND, ): - try: - self.client.PutRateLimit( - PutRateLimitRequest( - key=key, - limit=limit, - duration=duration, - ), - metadata=get_metadata(self.token), - ) - except grpc.RpcError as e: - raise ValueError(f"Could not put rate limit: {e}") + self.client.PutRateLimit( + PutRateLimitRequest( + key=key, + limit=limit, + duration=duration, + ), + metadata=get_metadata(self.token), + ) @tenacity_retry def schedule_workflow( @@ -469,11 +450,11 @@ def schedule_workflow( request, metadata=get_metadata(self.token), ) - except grpc.RpcError as e: + except (grpc.RpcError, grpc.aio.AioRpcError) as e: if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise DedupeViolationErr(e.details()) - raise ValueError(f"gRPC error: {e}") + raise e ## TODO: `options` is treated as a dict (wrong type hint) ## TODO: `any` type hint should come from `typing` @@ -541,55 +522,49 @@ def run_workflow( workflow_listener=self.pooled_workflow_listener, workflow_run_event_listener=self.listener_client, ) - except grpc.RpcError as e: + except (grpc.RpcError, grpc.aio.AioRpcError) as e: if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise DedupeViolationErr(e.details()) - raise ValueError(f"gRPC error: {e}") + raise e @tenacity_retry def run_workflows( self, workflows: List[WorkflowRunDict], options: TriggerWorkflowOptions = None ) -> list[WorkflowRunRef]: workflow_run_requests: TriggerWorkflowRequest = [] - try: - if not self.pooled_workflow_listener: - self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) + if not self.pooled_workflow_listener: + self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) - for workflow in workflows: - workflow_name = workflow["workflow_name"] - input_data = workflow["input"] - options = workflow["options"] + for workflow in workflows: + workflow_name = workflow["workflow_name"] + input_data = workflow["input"] + options = workflow["options"] - namespace = self.namespace - - if ( - options is not None - and "namespace" in options - and options["namespace"] is not None - ): - namespace = options["namespace"] - del options["namespace"] + namespace = self.namespace - if namespace != "" and not workflow_name.startswith(self.namespace): - workflow_name = f"{namespace}{workflow_name}" + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): + namespace = options["namespace"] + del options["namespace"] - # Prepare and trigger workflow for each workflow name and input - request = self._prepare_workflow_request( - workflow_name, input_data, options - ) + if namespace != "" and not workflow_name.startswith(self.namespace): + workflow_name = f"{namespace}{workflow_name}" - workflow_run_requests.append(request) + # Prepare and trigger workflow for each workflow name and input + request = self._prepare_workflow_request(workflow_name, input_data, options) - request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests) + workflow_run_requests.append(request) - resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow( - request, - metadata=get_metadata(self.token), - ) + request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests) - except grpc.RpcError as e: - raise ValueError(f"gRPC error: {e}") + resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow( + request, + metadata=get_metadata(self.token), + ) return [ WorkflowRunRef( diff --git a/hatchet_sdk/clients/events.py b/hatchet_sdk/clients/events.py index e188d386..160b780e 100644 --- a/hatchet_sdk/clients/events.py +++ b/hatchet_sdk/clients/events.py @@ -125,10 +125,7 @@ def push(self, event_key, payload, options: PushEventOptions = None) -> Event: span.add_event("Pushing event", attributes={"key": namespaced_event_key}) - try: - return self.client.Push(request, metadata=get_metadata(self.token)) - except grpc.RpcError as e: - raise ValueError(f"gRPC error: {e}") + return self.client.Push(request, metadata=get_metadata(self.token)) @tenacity_retry def bulk_push( @@ -188,13 +185,9 @@ def bulk_push( bulk_request = BulkPushEventRequest(events=bulk_events) span.add_event("Pushing bulk events") - try: - response = self.client.BulkPush( - bulk_request, metadata=get_metadata(self.token) - ) - return response.events - except grpc.RpcError as e: - raise ValueError(f"gRPC error: {e}") + response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token)) + + return response.events def log(self, message: str, step_run_id: str): try: From b6d5664f74f3e6e9a30d0dea5e78c2277373bd4f Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 16 Jan 2025 16:25:47 -0500 Subject: [PATCH 4/5] chore: version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 69380b67..884f6af3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.43.2" +version = "0.43.3" description = "" authors = ["Alexander Belanger "] readme = "README.md" From a2ffaac87c677f456eb93041cbff870568f9d762 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 16 Jan 2025 16:29:07 -0500 Subject: [PATCH 5/5] fix: catch addtl error type --- hatchet_sdk/clients/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hatchet_sdk/clients/admin.py b/hatchet_sdk/clients/admin.py index 65cbebb4..81662d1b 100644 --- a/hatchet_sdk/clients/admin.py +++ b/hatchet_sdk/clients/admin.py @@ -252,7 +252,7 @@ async def run_workflow( workflow_listener=self.pooled_workflow_listener, workflow_run_event_listener=self.listener_client, ) - except grpc.RpcError as e: + except (grpc.RpcError, grpc.aio.AioRpcError) as e: if e.code() == grpc.StatusCode.ALREADY_EXISTS: raise DedupeViolationErr(e.details())