Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,4 @@ cython_debug/
#.idea/

openapitools.json
.python-version
209 changes: 92 additions & 117 deletions hatchet_sdk/clients/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ async def run_workflow(
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")
raise e

@tenacity_retry
async def run_workflows(
Expand All @@ -266,56 +266,49 @@ async def run_workflows(
) -> List[WorkflowRunRef]:
if len(workflows) == 0:
raise ValueError("No workflows to run")
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)

namespace = self.namespace
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)

if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]
namespace = self.namespace

workflow_run_requests: TriggerWorkflowRequest = []
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]

for workflow in workflows:
workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]
workflow_run_requests: TriggerWorkflowRequest = []

if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"
for workflow in workflows:
workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]

# Prepare and trigger workflow for each workflow name and input
request = self._prepare_workflow_request(
workflow_name, input_data, options
)
workflow_run_requests.append(request)
if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"

request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
# Prepare and trigger workflow for each workflow name and input
request = self._prepare_workflow_request(workflow_name, input_data, options)
workflow_run_requests.append(request)

resp: BulkTriggerWorkflowResponse = (
await self.aio_client.BulkTriggerWorkflow(
request,
metadata=get_metadata(self.token),
)
)
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)

return [
WorkflowRunRef(
workflow_run_id=workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
for workflow_run_id in resp.workflow_run_ids
]
resp: BulkTriggerWorkflowResponse = await self.aio_client.BulkTriggerWorkflow(
request,
metadata=get_metadata(self.token),
)

except grpc.RpcError as e:
raise ValueError(f"gRPC error: {e}")
return [
WorkflowRunRef(
workflow_run_id=workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
for workflow_run_id in resp.workflow_run_ids
]

@tenacity_retry
async def put_workflow(
Expand All @@ -324,15 +317,12 @@ async def put_workflow(
workflow: CreateWorkflowVersionOpts | WorkflowMeta,
overrides: CreateWorkflowVersionOpts | None = None,
) -> WorkflowVersion:
try:
opts = self._prepare_put_workflow_request(name, workflow, overrides)
opts = self._prepare_put_workflow_request(name, workflow, overrides)

return await self.aio_client.PutWorkflow(
opts,
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
raise ValueError(f"Could not put workflow: {e}")
return await self.aio_client.PutWorkflow(
opts,
metadata=get_metadata(self.token),
)

@tenacity_retry
async def put_rate_limit(
Expand All @@ -341,17 +331,14 @@ async def put_rate_limit(
limit: int,
duration: RateLimitDuration = RateLimitDuration.SECOND,
):
try:
await self.aio_client.PutRateLimit(
PutRateLimitRequest(
key=key,
limit=limit,
duration=duration,
),
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
raise ValueError(f"Could not put rate limit: {e}")
await self.aio_client.PutRateLimit(
PutRateLimitRequest(
key=key,
limit=limit,
duration=duration,
),
metadata=get_metadata(self.token),
)

@tenacity_retry
async def schedule_workflow(
Expand Down Expand Up @@ -383,11 +370,11 @@ async def schedule_workflow(
request,
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
except (grpc.aio.AioRpcError, grpc.RpcError) as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")
raise e


class AdminClient(AdminClientBase):
Expand All @@ -408,17 +395,14 @@ def put_workflow(
workflow: CreateWorkflowVersionOpts | WorkflowMeta,
overrides: CreateWorkflowVersionOpts | None = None,
) -> WorkflowVersion:
try:
opts = self._prepare_put_workflow_request(name, workflow, overrides)
opts = self._prepare_put_workflow_request(name, workflow, overrides)

resp: WorkflowVersion = self.client.PutWorkflow(
opts,
metadata=get_metadata(self.token),
)
resp: WorkflowVersion = self.client.PutWorkflow(
opts,
metadata=get_metadata(self.token),
)

return resp
except grpc.RpcError as e:
raise ValueError(f"Could not put workflow: {e}")
return resp

@tenacity_retry
def put_rate_limit(
Expand All @@ -427,17 +411,14 @@ def put_rate_limit(
limit: int,
duration: Union[RateLimitDuration.Value, str] = RateLimitDuration.SECOND,
):
try:
self.client.PutRateLimit(
PutRateLimitRequest(
key=key,
limit=limit,
duration=duration,
),
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
raise ValueError(f"Could not put rate limit: {e}")
self.client.PutRateLimit(
PutRateLimitRequest(
key=key,
limit=limit,
duration=duration,
),
metadata=get_metadata(self.token),
)

@tenacity_retry
def schedule_workflow(
Expand Down Expand Up @@ -469,11 +450,11 @@ def schedule_workflow(
request,
metadata=get_metadata(self.token),
)
except grpc.RpcError as e:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")
raise e

## TODO: `options` is treated as a dict (wrong type hint)
## TODO: `any` type hint should come from `typing`
Expand Down Expand Up @@ -541,55 +522,49 @@ def run_workflow(
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")
raise e

@tenacity_retry
def run_workflows(
self, workflows: List[WorkflowRunDict], options: TriggerWorkflowOptions = None
) -> list[WorkflowRunRef]:
workflow_run_requests: TriggerWorkflowRequest = []
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)

for workflow in workflows:
workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]
for workflow in workflows:
workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]

namespace = self.namespace

if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]
namespace = self.namespace

if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]

# Prepare and trigger workflow for each workflow name and input
request = self._prepare_workflow_request(
workflow_name, input_data, options
)
if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"

workflow_run_requests.append(request)
# Prepare and trigger workflow for each workflow name and input
request = self._prepare_workflow_request(workflow_name, input_data, options)

request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)
workflow_run_requests.append(request)

resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow(
request,
metadata=get_metadata(self.token),
)
request = BulkTriggerWorkflowRequest(workflows=workflow_run_requests)

except grpc.RpcError as e:
raise ValueError(f"gRPC error: {e}")
resp: BulkTriggerWorkflowResponse = self.client.BulkTriggerWorkflow(
request,
metadata=get_metadata(self.token),
)

return [
WorkflowRunRef(
Expand Down
15 changes: 4 additions & 11 deletions hatchet_sdk/clients/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ def push(self, event_key, payload, options: PushEventOptions = None) -> Event:

span.add_event("Pushing event", attributes={"key": namespaced_event_key})

try:
return self.client.Push(request, metadata=get_metadata(self.token))
except grpc.RpcError as e:
raise ValueError(f"gRPC error: {e}")
return self.client.Push(request, metadata=get_metadata(self.token))

@tenacity_retry
def bulk_push(
Expand Down Expand Up @@ -188,13 +185,9 @@ def bulk_push(
bulk_request = BulkPushEventRequest(events=bulk_events)

span.add_event("Pushing bulk events")
try:
response = self.client.BulkPush(
bulk_request, metadata=get_metadata(self.token)
)
return response.events
except grpc.RpcError as e:
raise ValueError(f"gRPC error: {e}")
response = self.client.BulkPush(bulk_request, metadata=get_metadata(self.token))

return response.events

def log(self, message: str, step_run_id: str):
try:
Expand Down
4 changes: 2 additions & 2 deletions hatchet_sdk/clients/rest/tenacity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def tenacity_alert_retry(retry_state: tenacity.RetryCallState) -> None:


def tenacity_should_retry(ex: Exception) -> bool:
if isinstance(ex, grpc.aio.AioRpcError):
if ex.code in [
if isinstance(ex, (grpc.aio.AioRpcError, grpc.RpcError)):
if ex.code() in [
grpc.StatusCode.UNIMPLEMENTED,
grpc.StatusCode.NOT_FOUND,
]:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.43.2"
version = "0.43.3"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"
Expand Down
Loading