Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ repos:
- --maxkb=1024
- repo: https://github.com/astral-sh/uv-pre-commit
# uv version.
rev: 0.5.24
rev: 0.6.14
hooks:
# Keep uv.lock up to date.
- id: uv-lock

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.9.3
rev: v0.11.5
hooks:
# Run the linter.
- id: ruff
Expand Down
2 changes: 1 addition & 1 deletion examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def increment(x):
# Here's how you can access the raw context of the activity task if you need
# it. It gives you access to the response of the PollForActivityTask call to
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax # NOQA
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax
logger.warning(f"activity context: {increment.context}")
return x + 1

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ extend-select = [
"W", # pycodestyle warnings
"RUF", # ruff
]
fixable = ["ALL"]
allowed-confusables = ["‘", "’"]
extend-ignore = [
]
ignore = ["E501"]

[tool.ruff.lint.isort]
required-imports = ["from __future__ import annotations"]

Expand Down
34 changes: 27 additions & 7 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ def terminate_workflow(
run_id: str | None,
):
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
ex.terminate()


Expand All @@ -251,6 +254,9 @@ def terminate_workflow(
)
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
history = ex.history()
ex.terminate(reason="workflow.restart")
new_ex = ex.workflow_type.start_execution(
Expand Down Expand Up @@ -315,6 +321,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
)


# FIXME superseded by history
@click.option(
"--nb-tasks",
"-n",
Expand Down Expand Up @@ -347,6 +354,7 @@ def workflow_tasks(
)


# FIXME superseded by filter
@click.argument(
"domain",
envvar="SWF_DOMAIN",
Expand All @@ -373,16 +381,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int):
_NOTSET = object()


@click.argument(
"domain",
envvar="SWF_DOMAIN",
)
@cli.command(
"workflow.history",
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
)
@click.argument("workflow_id")
@click.argument("run_id", type=RUN_ID, required=False)
@click.option(
"--domain",
envvar="SWF_DOMAIN",
)
@click.option(
"--output-format",
"--of",
Expand All @@ -402,12 +410,15 @@ def workflow_history(
output_format: str,
reverse_order: bool = False,
) -> None:
from simpleflow.swf.mapper.models.history.base import History as BaseHistory

if ctx.parent.params["format"] != "json" or not ctx.parent.params["header"]:
raise NotImplementedError("Only pretty JSON mode is implemented")

from simpleflow.swf.mapper.models.history.base import History as BaseHistory

ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
sys.exit(1)
events = ex.history_events(
callback=get_progression_callback("events"),
reverse_order=reverse_order,
Expand All @@ -432,11 +443,15 @@ def workflow_history(
elif output_format == "cooked":
history.parse()
events = {
"workflow": history.workflow,
"activities": history.activities,
"child_workflows": history.child_workflows,
"markers": history.markers,
"signals": history.signals,
"timers": history.timers,
"signals": history.signals,
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
else:
raise NotImplementedError
Expand Down Expand Up @@ -840,6 +855,11 @@ def standalone(
ex.workflow_id,
ex.run_id,
)
if not ex:
print(
f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found"
)
sys.exit(1)
if display_status:
print(f"status: {ex.status}", file=sys.stderr)
if ex.status == ex.STATUS_CLOSED:
Expand Down
117 changes: 117 additions & 0 deletions simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non
self.started_decision_id: int | None = None
self.completed_decision_id: int | None = None
self.last_event_id: int | None = None
self._workflow: dict[str, Any] = {}

@property
def swf_history(self) -> simpleflow.swf.mapper.models.history.History:
return self._history

@property
def workflow(self):
return self._workflow

@property
def activities(self) -> dict[str, ActivityTaskEventDict]:
"""
Expand Down Expand Up @@ -432,6 +437,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven
"""
Parse a workflow event.
"""
if event.state == "started":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"child_policy": getattr(event, "child_policy", None),
"task_list": event.task_list["name"],
"workflow_type": event.workflow_type,
"continued_execution_run_id": getattr(event, "continued_execution_run_id", None),
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
"input": getattr(event, "input", None),
"lambda_role": getattr(event, "lambda_role", None),
"parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None),
"parent_workflow_execution": getattr(event, "parent_workflow_execution", None),
"tag_list": getattr(event, "tag_list", None),
"task_priority": getattr(event, "task_priority", None),
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
}
)
elif event.state == "continued_as_new":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
"new_execution_run_id": event.new_execution_run_id,
"task_list": event.task_list["name"],
"workflow_type": event.workflow_type,
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
"input": getattr(event, "input", None),
"lambda_role": getattr(event, "lambda_role", None),
"tag_list": getattr(event, "tag_list", None),
"task_priority": getattr(event, "task_priority", None),
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
}
)
elif event.state == "completed":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"result": getattr(event, "result", None),
}
)
elif event.state == "cancelled":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"decision_task_completed_event_id": event.decision_task_completed_event_id,
"details": getattr(event, "details", None),
}
)
elif event.state == "failed":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"decision_task_completed_event_id": event.decision_task_completed_event_id,
"reason": getattr(event, "reason", None),
"details": getattr(event, "details", None),
}
)
elif event.state == "terminated":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"cause": getattr(event, "cause", None),
"details": getattr(event, "details", None),
}
)
elif event.state == "timed_out":
self._workflow.update(
{
"state": event.state,
f"{event.state}_id": event.id,
f"{event.state}_timestamp": event.timestamp,
"initiated_event_id": getattr(event, "initiated_event_id", None),
"timeout_type": event.timeout_type,
}
)
# elif event.state in (
# "cancel_failed",
# "complete_failed",
# "continue_as_new",
# "fail_failed",
# "start_child_failed",
# "start_failed",
# "terminate_failed",
# ):
# self._workflow.update(
# {
# "state": event.state,
# f"{event.state}_id": event.id,
# f"{event.state}_cause": getattr(event, "cause", None),
# f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
# }
# )

if event.state == "cancel_requested":
self._workflow.update()
if event.state == "signaled":
signal = {
"type": "signal",
Expand Down
7 changes: 3 additions & 4 deletions simpleflow/local/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import collections
import sys
import traceback
import uuid
Expand All @@ -27,8 +26,8 @@ def __init__(self, workflow_class, **kwargs):
super().__init__(workflow_class)
self.update_workflow_class()
self.nb_activities = 0
self.signals_sent = set()
self._markers = collections.OrderedDict()
self.signals_sent: set[str] = set()
self._markers: dict[str, list[Marker]] = {}

self.wf_run_id = []
self.wf_id = []
Expand Down Expand Up @@ -206,7 +205,7 @@ def continue_as_new(self, workflow: type[Workflow], *args, **kwargs):
self.update_workflow_class()
self.nb_activities = 0
self.signals_sent = set()
self._markers = collections.OrderedDict()
self._markers = {}

self.wf_run_id = []
self.wf_id = []
Expand Down
Loading
Loading