Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ def parse_timer_event(self, events: list[Event], event: TimerEvent):
timer.update(
{
"state": event.state,
"cause": event.cause,
"cancel_failed_event_id": event.id,
"cancel_failed_event_timestamp": event.timestamp,
}
Expand Down
26 changes: 12 additions & 14 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from simpleflow.workflow import Workflow

if TYPE_CHECKING:
from simpleflow.activity import NotSet
from simpleflow.swf.mapper.models.domain import Domain

__all__ = ["Executor"]
Expand Down Expand Up @@ -199,7 +200,7 @@ def history(self) -> History | None:

def _make_task_id(
self,
a_task: ActivityTask | WorkflowTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
workflow_id: str,
run_id: str,
*args,
Expand Down Expand Up @@ -428,7 +429,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any])
future = futures.Future()
if not event:
return future
state = event["state"]
state: str = event["state"]
if state == "started":
future.set_running()
elif state == "fired":
Expand All @@ -438,7 +439,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any])
elif state in ("start_failed", "cancel_failed"):
future.set_exception(
exceptions.TaskFailed(
name=event["timer_id"],
name=event["id"],
reason=event["cause"],
)
)
Expand Down Expand Up @@ -502,7 +503,7 @@ def find_timer_event(self, a_task: TimerTask | CancelTimerTask, history: History
return None
return event

TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, callable]] = {
TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, Callable]] = {
ActivityTask: find_activity_event,
WorkflowTask: find_child_workflow_event,
SignalTask: find_signal_event,
Expand Down Expand Up @@ -771,7 +772,7 @@ def _add_start_timer_decision(self, id, timeout=0):

def resume(
self,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
*args,
**kwargs,
) -> futures.Future:
Expand Down Expand Up @@ -843,7 +844,7 @@ def resume(

def make_task_id(
self,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
*args,
**kwargs,
) -> None:
Expand All @@ -857,7 +858,11 @@ def make_task_id(
workflow_id, run_id = self._workflow_id, self._run_id
a_task.id = self._make_task_id(a_task, workflow_id, run_id, *args, **kwargs)

def _compute_priority(self, priority_set_on_submit, a_task):
def _compute_priority(
self,
priority_set_on_submit: str | int | NotSet,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
) -> str | int | None:
"""
Computes the correct task priority, with the following precedence (first
is better/preferred):
Expand All @@ -866,14 +871,7 @@ def _compute_priority(self, priority_set_on_submit, a_task):
- priority set on the workflow execution
- None otherwise

:param priority_set_on_submit:
:type priority_set_on_submit: str|int|PRIORITY_NOT_SET

:param a_task:
:type a_task: ActivityTask|WorkflowTask

:returns: the priority for this task
:rtype: str|int|None
"""
if priority_set_on_submit is not PRIORITY_NOT_SET:
return priority_set_on_submit
Expand Down
1 change: 1 addition & 0 deletions simpleflow/swf/mapper/models/decision/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from simpleflow.swf.mapper.models.decision.base import Decision # NOQA
from simpleflow.swf.mapper.models.decision.marker import MarkerDecision # NOQA
from simpleflow.swf.mapper.models.decision.task import ActivityTaskDecision # NOQA
from simpleflow.swf.mapper.models.decision.timer import TimerDecision # NOQA
Expand Down
14 changes: 8 additions & 6 deletions simpleflow/swf/mapper/querysets/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ def get(self, name, version, *args, **kwargs):
error_code = extract_error_code(e)
message = extract_message(e)
if error_code == "UnknownResourceFault":
raise DoesNotExistError(message) from e
raise DoesNotExistError(message or "") from e

raise ResponseError(message, error_code=error_code) from e
raise ResponseError(message or "", error_code=error_code or "") from e

wt_info = response[self._infos]
wt_config = response["configuration"]
Expand Down Expand Up @@ -465,12 +465,14 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], *
execution_info["workflowType"]["version"],
)

workflow_id: str = get_subkey(execution_info, ["execution", "workflowId"]) # type: ignore
status: str = execution_info.get("executionStatus") # type: ignore
return WorkflowExecution(
domain,
get_subkey(execution_info, ["execution", "workflowId"]), # workflow_id
workflow_id,
run_id=get_subkey(execution_info, ["execution", "runId"]),
workflow_type=workflow_type,
status=execution_info.get("executionStatus"),
status=status,
close_status=execution_info.get("closeStatus"),
tag_list=execution_info.get("tagList"),
start_timestamp=execution_info.get("startTimestamp"),
Expand All @@ -488,9 +490,9 @@ def get(self, workflow_id, run_id, *args, **kwargs):
error_code = extract_error_code(e)
message = extract_message(e)
if error_code == "UnknownResourceFault":
raise DoesNotExistError(message) from e
raise DoesNotExistError(message or "") from e

raise ResponseError(message, error_code=error_code) from e
raise ResponseError(message or "", error_code=error_code or "") from e

execution_info = response[self._infos]
execution_config = response["executionConfiguration"]
Expand Down
18 changes: 8 additions & 10 deletions simpleflow/swf/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio


class TimerTask(task.TimerTask, SwfTask):
idempotent = True

@classmethod
def from_generic_task(cls, a_task: task.TimerTask) -> Self:
return cls(a_task.timer_id, a_task.timeout, a_task.control)
return cls(a_task.timer_id, a_task.timeout, a_task.control, a_task.idempotent)

def __init__(self, timer_id: str, timeout: int | str, control: dict[str, Any] | None) -> None:
super().__init__(timer_id, timeout, control)
def __init__(
self, timer_id: str, timeout: int | str, control: dict[str, Any] | None, idempotent: bool = True
) -> None:
super().__init__(timer_id, timeout, control, idempotent)

def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]:
decision = simpleflow.swf.mapper.models.decision.TimerDecision(
Expand All @@ -394,14 +394,12 @@ def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decisio


class CancelTimerTask(task.CancelTimerTask, SwfTask):
idempotent = True

@classmethod
def from_generic_task(cls, a_task: task.CancelTimerTask) -> Self:
return cls(a_task.timer_id)
return cls(a_task.timer_id, a_task.idempotent)

def __init__(self, timer_id: str) -> None:
super().__init__(timer_id)
def __init__(self, timer_id: str, idempotent: bool = True) -> None:
super().__init__(timer_id, idempotent)

def schedule(self, *args, **kwargs) -> list[simpleflow.swf.mapper.models.decision.Decision]:
decision = simpleflow.swf.mapper.models.decision.TimerDecision(
Expand Down
12 changes: 8 additions & 4 deletions simpleflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,13 @@ class TimerTask(Task):
Timer.
"""

def __init__(self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None) -> None:
def __init__(
self, timer_id: str, timeout: str | int, control: dict[str, Any] | None = None, idempotent: bool = True
) -> None:
self.timer_id = timer_id
self.timeout = timeout
self.control = control
self.idempotent = idempotent
self.args: tuple = ()
self.kwargs: dict[str, Any] = {}

Expand All @@ -257,7 +260,7 @@ def id(self) -> str:
return self.timer_id

def __repr__(self) -> str:
return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout}>'
return f'<{self.__class__.__name__} timer_id="{self.timer_id}" timeout={self.timeout} idempotent={self.idempotent}>'

def execute(self):
# Local execution
Expand All @@ -271,8 +274,9 @@ class CancelTimerTask(Task):
Timer cancellation.
"""

def __init__(self, timer_id: str) -> None:
def __init__(self, timer_id: str, idempotent: bool = True) -> None:
self.timer_id = timer_id
self.idempotent = idempotent
self.args: tuple = ()
self.kwargs: dict[str, Any] = {}

Expand All @@ -285,7 +289,7 @@ def id(self) -> str:
return self.timer_id

def __repr__(self) -> str:
return f'<{self.__class__.__name__} timer_id="{self.timer_id}">'
return f'<{self.__class__.__name__} timer_id="{self.timer_id} idempotent={self.idempotent}">'

def execute(self) -> None:
# Local execution: no-op
Expand Down
Loading