From f3fa8c6df7846346d7875c3237af29e4bfa9a141 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Wed, 22 Oct 2025 10:16:33 +0200 Subject: [PATCH 1/4] style: typing updates Signed-off-by: Yves Bastide --- simpleflow/swf/executor.py | 22 +++++++++---------- .../swf/mapper/models/decision/__init__.py | 1 + simpleflow/swf/mapper/querysets/workflow.py | 14 +++++++----- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index fdde96d4e..8a4816c49 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -38,6 +38,7 @@ from simpleflow.workflow import Workflow if TYPE_CHECKING: + from simpleflow.activity import NotSet from simpleflow.swf.mapper.models.domain import Domain __all__ = ["Executor"] @@ -199,7 +200,7 @@ def history(self) -> History | None: def _make_task_id( self, - a_task: ActivityTask | WorkflowTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, workflow_id: str, run_id: str, *args, @@ -502,7 +503,7 @@ def find_timer_event(self, a_task: TimerTask | CancelTimerTask, history: History return None return event - TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, callable]] = { + TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, Callable]] = { ActivityTask: find_activity_event, WorkflowTask: find_child_workflow_event, SignalTask: find_signal_event, @@ -771,7 +772,7 @@ def _add_start_timer_decision(self, id, timeout=0): def resume( self, - a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, *args, **kwargs, ) -> futures.Future: @@ -843,7 +844,7 @@ def resume( def make_task_id( self, - a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, *args, **kwargs, ) -> None: @@ -857,7 +858,11 @@ def make_task_id( workflow_id, run_id = self._workflow_id, self._run_id a_task.id = self._make_task_id(a_task, workflow_id, run_id, *args, **kwargs) - def _compute_priority(self, priority_set_on_submit, a_task): + def _compute_priority( + self, + priority_set_on_submit: str | int | NotSet, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, + ) -> str | int | None: """ Computes the correct task priority, with the following precedence (first is better/preferred): @@ -866,14 +871,7 @@ def _compute_priority(self, priority_set_on_submit, a_task): - priority set on the workflow execution - None otherwise - :param priority_set_on_submit: - :type priority_set_on_submit: str|int|PRIORITY_NOT_SET - - :param a_task: - :type a_task: ActivityTask|WorkflowTask - :returns: the priority for this task - :rtype: str|int|None """ if priority_set_on_submit is not PRIORITY_NOT_SET: return priority_set_on_submit diff --git a/simpleflow/swf/mapper/models/decision/__init__.py b/simpleflow/swf/mapper/models/decision/__init__.py index f9a01dbe6..433056584 100644 --- a/simpleflow/swf/mapper/models/decision/__init__.py +++ b/simpleflow/swf/mapper/models/decision/__init__.py @@ -1,3 +1,4 @@ +from simpleflow.swf.mapper.models.decision.base import Decision # NOQA from simpleflow.swf.mapper.models.decision.marker import MarkerDecision # NOQA from simpleflow.swf.mapper.models.decision.task import ActivityTaskDecision # NOQA from simpleflow.swf.mapper.models.decision.timer import TimerDecision # NOQA diff --git a/simpleflow/swf/mapper/querysets/workflow.py b/simpleflow/swf/mapper/querysets/workflow.py index b19fe5078..022da4eeb 100644 --- a/simpleflow/swf/mapper/querysets/workflow.py +++ b/simpleflow/swf/mapper/querysets/workflow.py @@ -133,9 +133,9 @@ def get(self, name, version, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) from e + raise DoesNotExistError(message or "") from e - raise ResponseError(message, error_code=error_code) from e + raise ResponseError(message or "", error_code=error_code or "") from e wt_info = response[self._infos] wt_config = response["configuration"] @@ -465,12 +465,14 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], * execution_info["workflowType"]["version"], ) + workflow_id: str = get_subkey(execution_info, ["execution", "workflowId"]) # type: ignore + status: str = execution_info.get("executionStatus") # type: ignore return WorkflowExecution( domain, - get_subkey(execution_info, ["execution", "workflowId"]), # workflow_id + workflow_id, run_id=get_subkey(execution_info, ["execution", "runId"]), workflow_type=workflow_type, - status=execution_info.get("executionStatus"), + status=status, close_status=execution_info.get("closeStatus"), tag_list=execution_info.get("tagList"), start_timestamp=execution_info.get("startTimestamp"), @@ -488,9 +490,9 @@ def get(self, workflow_id, run_id, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) from e + raise DoesNotExistError(message or "") from e - raise ResponseError(message, error_code=error_code) from e + raise ResponseError(message or "", error_code=error_code or "") from e execution_info = response[self._infos] execution_config = response["executionConfiguration"] From ace82aca754ea4536c37bbb65819832432c2e397 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Wed, 22 Oct 2025 10:17:18 +0200 Subject: [PATCH 2/4] feat: TimerTask, CancelTimerTask: `idempotent` arg Signed-off-by: Yves Bastide --- simpleflow/swf/task.py | 18 ++++++++---------- simpleflow/task.py | 12 ++++++++---- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/simpleflow/swf/task.py b/simpleflow/swf/task.py index f272a7a95..ad5425cc1 100644 --- a/simpleflow/swf/task.py +++ b/simpleflow/swf/task.py @@ -374,14 +374,14 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio class TimerTask(task.TimerTask, SwfTask): - idempotent = True - @classmethod def from_generic_task(cls, a_task: task.TimerTask) -> Self: - return cls(a_task.timer_id, a_task.timeout, a_task.control) + return cls(a_task.timer_id, a_task.timeout, a_task.control, a_task.idempotent) - def __init__(self, timer_id: str, timeout: int | str, control: dict[str, Any] | None) -> None: - super().__init__(timer_id, timeout, control) + def __init__( + self, timer_id: str, timeout: int | str, control: dict[str, Any] | None, idempotent: bool = True + ) -> None: + super().__init__(timer_id, timeout, control, idempotent) def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]: decision = simpleflow.swf.mapper.models.decision.TimerDecision( @@ -394,14 +394,12 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio class CancelTimerTask(task.CancelTimerTask, SwfTask): - idempotent = True - @classmethod def from_generic_task(cls, a_task: task.CancelTimerTask) -> Self: - return cls(a_task.timer_id) + return cls(a_task.timer_id, a_task.idempotent) - def __init__(self, timer_id: str) -> None: - super().__init__(timer_id) + def __init__(self, timer_id: str, idempotent: bool = True) -> None: + super().__init__(timer_id, idempotent) def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]: decision = simpleflow.swf.mapper.models.decision.TimerDecision( diff --git a/simpleflow/task.py b/simpleflow/task.py index 322d83229..502880d03 100644 --- a/simpleflow/task.py +++ b/simpleflow/task.py @@ -241,10 +241,13 @@ class TimerTask(Task): Timer. """ - def __init__(self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None) -> None: + def __init__( + self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None, idempotent: bool = True + ) -> None: self.timer_id = timer_id self.timeout = timeout self.control = control + self.idempotent = idempotent self.args: tuple = () self.kwargs: dict[str, Any] = {} @@ -257,7 +260,7 @@ def id(self) -> str: return self.timer_id def __repr__(self) -> str: - return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout}>' + return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout} idempotent={self.idempotent}>' def execute(self): # Local execution @@ -271,8 +274,9 @@ class CancelTimerTask(Task): Timer cancellation. """ - def __init__(self, timer_id: str) -> None: + def __init__(self, timer_id: str, idempotent: bool = True) -> None: self.timer_id = timer_id + self.idempotent = idempotent self.args: tuple = () self.kwargs: dict[str, Any] = {} @@ -285,7 +289,7 @@ def id(self) -> str: return self.timer_id def __repr__(self) -> str: - return f'<{self.__class__.__name__} timer_id="{self.timer_id}">' + return f'<{self.__class__.__name__} timer_id="{self.timer_id} idempotent={self.idempotent}">' def execute(self) -> None: # Local execution: no-op From 860906a6e17bbcb7de0ba73d3b007612b6a8db8a Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Wed, 22 Oct 2025 11:02:55 +0200 Subject: [PATCH 3/4] fix: _get_future_from_timer_event The timer id is `id`, not `timer_id`. Signed-off-by: Yves Bastide --- simpleflow/swf/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 8a4816c49..efc8967cb 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -429,7 +429,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any]) future = futures.Future() if not event: return future - state = event["state"] + state: str = event["state"] if state == "started": future.set_running() elif state == "fired": @@ -439,7 +439,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any]) elif state in ("start_failed", "cancel_failed"): future.set_exception( exceptions.TaskFailed( - name=event["timer_id"], + name=event["id"], reason=event["cause"], ) ) From 34314f94064b44e03c14f0e50bfb36c7708746d5 Mon Sep 17 00:00:00 2001 From: Yves Bastide Date: Wed, 22 Oct 2025 11:31:53 +0200 Subject: [PATCH 4/4] fix: cancel_failed: cause Signed-off-by: Yves Bastide --- simpleflow/history.py | 1 + 1 file changed, 1 insertion(+) diff --git a/simpleflow/history.py b/simpleflow/history.py index 9eae9a046..424f34d9a 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -760,6 +760,7 @@ def parse_timer_event(self, events: list[Event], event: TimerEvent): timer.update( { "state": event.state, + "cause": event.cause, "cancel_failed_event_id": event.id, "cancel_failed_event_timestamp": event.timestamp, }