diff --git a/simpleflow/history.py b/simpleflow/history.py index 9eae9a04..424f34d9 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -760,6 +760,7 @@ def parse_timer_event(self, events: list[Event], event: TimerEvent): timer.update( { "state": event.state, + "cause": event.cause, "cancel_failed_event_id": event.id, "cancel_failed_event_timestamp": event.timestamp, } diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index fdde96d4..efc8967c 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -38,6 +38,7 @@ from simpleflow.workflow import Workflow if TYPE_CHECKING: + from simpleflow.activity import NotSet from simpleflow.swf.mapper.models.domain import Domain __all__ = ["Executor"] @@ -199,7 +200,7 @@ def history(self) -> History | None: def _make_task_id( self, - a_task: ActivityTask | WorkflowTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, workflow_id: str, run_id: str, *args, @@ -428,7 +429,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any]) future = futures.Future() if not event: return future - state = event["state"] + state: str = event["state"] if state == "started": future.set_running() elif state == "fired": @@ -438,7 +439,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any]) elif state in ("start_failed", "cancel_failed"): future.set_exception( exceptions.TaskFailed( - name=event["timer_id"], + name=event["id"], reason=event["cause"], ) ) @@ -502,7 +503,7 @@ def find_timer_event(self, a_task: TimerTask | CancelTimerTask, history: History return None return event - TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, callable]] = { + TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, Callable]] = { ActivityTask: find_activity_event, WorkflowTask: find_child_workflow_event, SignalTask: find_signal_event, @@ -771,7 +772,7 @@ def _add_start_timer_decision(self, id, timeout=0): def resume( self, - a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, *args, **kwargs, ) -> futures.Future: @@ -843,7 +844,7 @@ def resume( def make_task_id( self, - a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, *args, **kwargs, ) -> None: @@ -857,7 +858,11 @@ def make_task_id( workflow_id, run_id = self._workflow_id, self._run_id a_task.id = self._make_task_id(a_task, workflow_id, run_id, *args, **kwargs) - def _compute_priority(self, priority_set_on_submit, a_task): + def _compute_priority( + self, + priority_set_on_submit: str | int | NotSet, + a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask, + ) -> str | int | None: """ Computes the correct task priority, with the following precedence (first is better/preferred): @@ -866,14 +871,7 @@ def _compute_priority(self, priority_set_on_submit, a_task): - priority set on the workflow execution - None otherwise - :param priority_set_on_submit: - :type priority_set_on_submit: str|int|PRIORITY_NOT_SET - - :param a_task: - :type a_task: ActivityTask|WorkflowTask - :returns: the priority for this task - :rtype: str|int|None """ if priority_set_on_submit is not PRIORITY_NOT_SET: return priority_set_on_submit diff --git a/simpleflow/swf/mapper/models/decision/__init__.py b/simpleflow/swf/mapper/models/decision/__init__.py index f9a01dbe..43305658 100644 --- a/simpleflow/swf/mapper/models/decision/__init__.py +++ b/simpleflow/swf/mapper/models/decision/__init__.py @@ -1,3 +1,4 @@ +from simpleflow.swf.mapper.models.decision.base import Decision # NOQA from simpleflow.swf.mapper.models.decision.marker import MarkerDecision # NOQA from simpleflow.swf.mapper.models.decision.task import ActivityTaskDecision # NOQA from simpleflow.swf.mapper.models.decision.timer import TimerDecision # NOQA diff --git a/simpleflow/swf/mapper/querysets/workflow.py b/simpleflow/swf/mapper/querysets/workflow.py index b19fe507..022da4ee 100644 --- a/simpleflow/swf/mapper/querysets/workflow.py +++ b/simpleflow/swf/mapper/querysets/workflow.py @@ -133,9 +133,9 @@ def get(self, name, version, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) from e + raise DoesNotExistError(message or "") from e - raise ResponseError(message, error_code=error_code) from e + raise ResponseError(message or "", error_code=error_code or "") from e wt_info = response[self._infos] wt_config = response["configuration"] @@ -465,12 +465,14 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], * execution_info["workflowType"]["version"], ) + workflow_id: str = get_subkey(execution_info, ["execution", "workflowId"]) # type: ignore + status: str = execution_info.get("executionStatus") # type: ignore return WorkflowExecution( domain, - get_subkey(execution_info, ["execution", "workflowId"]), # workflow_id + workflow_id, run_id=get_subkey(execution_info, ["execution", "runId"]), workflow_type=workflow_type, - status=execution_info.get("executionStatus"), + status=status, close_status=execution_info.get("closeStatus"), tag_list=execution_info.get("tagList"), start_timestamp=execution_info.get("startTimestamp"), @@ -488,9 +490,9 @@ def get(self, workflow_id, run_id, *args, **kwargs): error_code = extract_error_code(e) message = extract_message(e) if error_code == "UnknownResourceFault": - raise DoesNotExistError(message) from e + raise DoesNotExistError(message or "") from e - raise ResponseError(message, error_code=error_code) from e + raise ResponseError(message or "", error_code=error_code or "") from e execution_info = response[self._infos] execution_config = response["executionConfiguration"] diff --git a/simpleflow/swf/task.py b/simpleflow/swf/task.py index f272a7a9..ad5425cc 100644 --- a/simpleflow/swf/task.py +++ b/simpleflow/swf/task.py @@ -374,14 +374,14 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio class TimerTask(task.TimerTask, SwfTask): - idempotent = True - @classmethod def from_generic_task(cls, a_task: task.TimerTask) -> Self: - return cls(a_task.timer_id, a_task.timeout, a_task.control) + return cls(a_task.timer_id, a_task.timeout, a_task.control, a_task.idempotent) - def __init__(self, timer_id: str, timeout: int | str, control: dict[str, Any] | None) -> None: - super().__init__(timer_id, timeout, control) + def __init__( + self, timer_id: str, timeout: int | str, control: dict[str, Any] | None, idempotent: bool = True + ) -> None: + super().__init__(timer_id, timeout, control, idempotent) def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]: decision = simpleflow.swf.mapper.models.decision.TimerDecision( @@ -394,14 +394,12 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio class CancelTimerTask(task.CancelTimerTask, SwfTask): - idempotent = True - @classmethod def from_generic_task(cls, a_task: task.CancelTimerTask) -> Self: - return cls(a_task.timer_id) + return cls(a_task.timer_id, a_task.idempotent) - def __init__(self, timer_id: str) -> None: - super().__init__(timer_id) + def __init__(self, timer_id: str, idempotent: bool = True) -> None: + super().__init__(timer_id, idempotent) def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]: decision = simpleflow.swf.mapper.models.decision.TimerDecision( diff --git a/simpleflow/task.py b/simpleflow/task.py index 322d8322..502880d0 100644 --- a/simpleflow/task.py +++ b/simpleflow/task.py @@ -241,10 +241,13 @@ class TimerTask(Task): Timer. """ - def __init__(self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None) -> None: + def __init__( + self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None, idempotent: bool = True + ) -> None: self.timer_id = timer_id self.timeout = timeout self.control = control + self.idempotent = idempotent self.args: tuple = () self.kwargs: dict[str, Any] = {} @@ -257,7 +260,7 @@ def id(self) -> str: return self.timer_id def __repr__(self) -> str: - return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout}>' + return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout} idempotent={self.idempotent}>' def execute(self): # Local execution @@ -271,8 +274,9 @@ class CancelTimerTask(Task): Timer cancellation. """ - def __init__(self, timer_id: str) -> None: + def __init__(self, timer_id: str, idempotent: bool = True) -> None: self.timer_id = timer_id + self.idempotent = idempotent self.args: tuple = () self.kwargs: dict[str, Any] = {} @@ -285,7 +289,7 @@ def id(self) -> str: return self.timer_id def __repr__(self) -> str: - return f'<{self.__class__.__name__} timer_id="{self.timer_id}">' + return f'<{self.__class__.__name__} timer_id="{self.timer_id} idempotent={self.idempotent}">' def execute(self) -> None: # Local execution: no-op