diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b59d290b8..4034e663c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,14 +14,14 @@ repos: - --maxkb=1024 - repo: https://github.com/astral-sh/uv-pre-commit # uv version. - rev: 0.5.24 + rev: 0.6.14 hooks: # Keep uv.lock up to date. - id: uv-lock - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.9.3 + rev: v0.11.5 hooks: # Run the linter. - id: ruff diff --git a/examples/basic.py b/examples/basic.py index c3083b041..30380bd10 100644 --- a/examples/basic.py +++ b/examples/basic.py @@ -10,7 +10,7 @@ def increment(x): # Here's how you can access the raw context of the activity task if you need # it. It gives you access to the response of the PollForActivityTask call to - # the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax # NOQA + # the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax logger.warning(f"activity context: {increment.context}") return x + 1 diff --git a/pyproject.toml b/pyproject.toml index 8d17fc853..865bd60a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,10 +67,11 @@ extend-select = [ "W", # pycodestyle warnings "RUF", # ruff ] -fixable = ["ALL"] allowed-confusables = ["‘", "’"] extend-ignore = [ ] +ignore = ["E501"] + [tool.ruff.lint.isort] required-imports = ["from __future__ import annotations"] diff --git a/simpleflow/command.py b/simpleflow/command.py index 74925c931..1fb5d3cbd 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -236,6 +236,9 @@ def terminate_workflow( run_id: str | None, ): ex = helpers.get_workflow_execution(domain, workflow_id, run_id) + if not ex: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) ex.terminate() @@ -251,6 +254,9 @@ def terminate_workflow( ) def restart_workflow(domain: str, workflow_id: str, run_id: str | None): ex = helpers.get_workflow_execution(domain, workflow_id, run_id) + if not ex: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) history = ex.history() ex.terminate(reason="workflow.restart") new_ex = ex.workflow_type.start_execution( @@ -315,6 +321,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks): ) +# FIXME superseded by history @click.option( "--nb-tasks", "-n", @@ -347,6 +354,7 @@ def workflow_tasks( ) +# FIXME superseded by filter @click.argument( "domain", envvar="SWF_DOMAIN", @@ -373,16 +381,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int): _NOTSET = object() -@click.argument( - "domain", - envvar="SWF_DOMAIN", -) @cli.command( "workflow.history", help="Workflow history from workflow WORKFLOW_ID [RUN_ID].", ) @click.argument("workflow_id") @click.argument("run_id", type=RUN_ID, required=False) +@click.option( + "--domain", + envvar="SWF_DOMAIN", +) @click.option( "--output-format", "--of", @@ -402,12 +410,15 @@ def workflow_history( output_format: str, reverse_order: bool = False, ) -> None: - from simpleflow.swf.mapper.models.history.base import History as BaseHistory - if ctx.parent.params["format"] != "json" or not ctx.parent.params["header"]: raise NotImplementedError("Only pretty JSON mode is implemented") + from simpleflow.swf.mapper.models.history.base import History as BaseHistory + ex = helpers.get_workflow_execution(domain, workflow_id, run_id) + if not ex: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) events = ex.history_events( callback=get_progression_callback("events"), reverse_order=reverse_order, @@ -432,11 +443,15 @@ def workflow_history( elif output_format == "cooked": history.parse() events = { + "workflow": history.workflow, "activities": history.activities, "child_workflows": history.child_workflows, "markers": history.markers, - "signals": history.signals, "timers": history.timers, + "signals": history.signals, + "signal_lists": history.signal_lists, + "external_workflows_signaling": history.external_workflows_signaling, + "signaled_workflows": history.signaled_workflows, } else: raise NotImplementedError @@ -840,6 +855,11 @@ def standalone( ex.workflow_id, ex.run_id, ) + if not ex: + print( + f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found" + ) + sys.exit(1) if display_status: print(f"status: {ex.status}", file=sys.stderr) if ex.status == ex.STATUS_CLOSED: diff --git a/simpleflow/history.py b/simpleflow/history.py index 34faaac51..9eae9a046 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non self.started_decision_id: int | None = None self.completed_decision_id: int | None = None self.last_event_id: int | None = None + self._workflow: dict[str, Any] = {} @property def swf_history(self) -> simpleflow.swf.mapper.models.history.History: return self._history + @property + def workflow(self): + return self._workflow + @property def activities(self) -> dict[str, ActivityTaskEventDict]: """ @@ -432,6 +437,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven """ Parse a workflow event. """ + if event.state == "started": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "child_policy": getattr(event, "child_policy", None), + "task_list": event.task_list["name"], + "workflow_type": event.workflow_type, + "continued_execution_run_id": getattr(event, "continued_execution_run_id", None), + "execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None), + "input": getattr(event, "input", None), + "lambda_role": getattr(event, "lambda_role", None), + "parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None), + "parent_workflow_execution": getattr(event, "parent_workflow_execution", None), + "tag_list": getattr(event, "tag_list", None), + "task_priority": getattr(event, "task_priority", None), + "task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None), + } + ) + elif event.state == "continued_as_new": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id, + "new_execution_run_id": event.new_execution_run_id, + "task_list": event.task_list["name"], + "workflow_type": event.workflow_type, + "execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None), + "input": getattr(event, "input", None), + "lambda_role": getattr(event, "lambda_role", None), + "tag_list": getattr(event, "tag_list", None), + "task_priority": getattr(event, "task_priority", None), + "task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None), + } + ) + elif event.state == "completed": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "initiated_event_id": getattr(event, "initiated_event_id", None), + "result": getattr(event, "result", None), + } + ) + elif event.state == "cancelled": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "initiated_event_id": getattr(event, "initiated_event_id", None), + "decision_task_completed_event_id": event.decision_task_completed_event_id, + "details": getattr(event, "details", None), + } + ) + elif event.state == "failed": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "initiated_event_id": getattr(event, "initiated_event_id", None), + "decision_task_completed_event_id": event.decision_task_completed_event_id, + "reason": getattr(event, "reason", None), + "details": getattr(event, "details", None), + } + ) + elif event.state == "terminated": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "initiated_event_id": getattr(event, "initiated_event_id", None), + "cause": getattr(event, "cause", None), + "details": getattr(event, "details", None), + } + ) + elif event.state == "timed_out": + self._workflow.update( + { + "state": event.state, + f"{event.state}_id": event.id, + f"{event.state}_timestamp": event.timestamp, + "initiated_event_id": getattr(event, "initiated_event_id", None), + "timeout_type": event.timeout_type, + } + ) + # elif event.state in ( + # "cancel_failed", + # "complete_failed", + # "continue_as_new", + # "fail_failed", + # "start_child_failed", + # "start_failed", + # "terminate_failed", + # ): + # self._workflow.update( + # { + # "state": event.state, + # f"{event.state}_id": event.id, + # f"{event.state}_cause": getattr(event, "cause", None), + # f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id, + # } + # ) + + if event.state == "cancel_requested": + self._workflow.update() if event.state == "signaled": signal = { "type": "signal", diff --git a/simpleflow/local/executor.py b/simpleflow/local/executor.py index 7ad94bcd6..c42f031f5 100644 --- a/simpleflow/local/executor.py +++ b/simpleflow/local/executor.py @@ -1,6 +1,5 @@ from __future__ import annotations -import collections import sys import traceback import uuid @@ -27,8 +26,8 @@ def __init__(self, workflow_class, **kwargs): super().__init__(workflow_class) self.update_workflow_class() self.nb_activities = 0 - self.signals_sent = set() - self._markers = collections.OrderedDict() + self.signals_sent: set[str] = set() + self._markers: dict[str, list[Marker]] = {} self.wf_run_id = [] self.wf_id = [] @@ -206,7 +205,7 @@ def continue_as_new(self, workflow: type[Workflow], *args, **kwargs): self.update_workflow_class() self.nb_activities = 0 self.signals_sent = set() - self._markers = collections.OrderedDict() + self._markers = {} self.wf_run_id = [] self.wf_id = [] diff --git a/simpleflow/metrology.py b/simpleflow/metrology.py index 8f4a97637..ce23278f8 100644 --- a/simpleflow/metrology.py +++ b/simpleflow/metrology.py @@ -6,7 +6,6 @@ import re import time from abc import ABC -from collections import OrderedDict from typing import Any from urllib.parse import quote_plus @@ -18,31 +17,29 @@ class StepIO: - def __init__(self): + def __init__(self) -> None: self.bytes = 0 self.records = 0 self.sampled = False - def get_stats(self, time_total): + def get_stats(self, time_total: float) -> dict[str, Any]: mb_s = None rec_s = None if self.bytes: mb_s = round(float(self.bytes) / (1024 * 1024) / time_total, 2) if self.records: rec_s = int(self.records / time_total) - return OrderedDict( - [ - ("bytes", self.bytes), - ("records", self.records), - ("mb_s", mb_s), - ("rec_s", rec_s), - ("sampled", self.sampled), - ] - ) + return { + "bytes": self.bytes, + "records": self.records, + "mb_s": mb_s, + "rec_s": rec_s, + "sampled": self.sampled, + } class Step: - def __init__(self, name, task): + def __init__(self, name: str, task: MetrologyTask) -> None: self.name = name self.task = task self.read = StepIO() @@ -57,22 +54,17 @@ def done(self): self.time_total = self.time_finished - self.time_started def get_stats(self): - stats = OrderedDict( - [ - ("name", self.name), - ("metadata", self.metadata), - ("time_started", self.time_started), - ("time_finished", self.time_finished), - ("time_total", self.time_total), - ("read", self.read.get_stats(self.time_total)), - ("write", self.write.get_stats(self.time_total)), - ] - ) - return stats + stats = { + "name": self.name, + "metadata": self.metadata, + "time_started": self.time_started, + "time_finished": self.time_finished, + "time_total": self.time_total, + "read": self.read.get_stats(self.time_total), + "write": self.write.get_stats(self.time_total), + } - def mset_metadata(self, kvs): - for k, v in kvs: - self.metadata[k] = v + return stats class StepExecution: @@ -90,14 +82,14 @@ class MetrologyTask: context: dict[str, Any] steps: list[Step] - def can_upload(self): + def can_upload(self) -> bool: if not hasattr(self, "context"): return False return all(c in self.context for c in ("workflow_id", "run_id", "activity_id")) @property def metrology_path(self): - path = [] + path: list[str] = [] if settings.METROLOGY_PATH_PREFIX is not None: path.append(settings.METROLOGY_PATH_PREFIX) path.append(self.context["workflow_id"]) @@ -105,7 +97,7 @@ def metrology_path(self): path.append(f"activity.{self.context['activity_id']}.json") return str(os.path.join(*path)) - def step(self, name): + def step(self, name: str) -> StepExecution: """ To be called in a `with` execution Ex : diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 1577f0209..fdde96d4e 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -170,9 +170,9 @@ def __init__( self._tasks = TaskRegistry() self._idempotent_tasks_to_submit = set() self._execution = None - self.current_priority = None - self.handled_failures = {} - self.created_activity_types = set() + self.current_priority: str | int | None = None + self.handled_failures: dict[int, Any] = {} + self.created_activity_types: set[tuple[str, str]] = set() def reset(self): """ @@ -223,7 +223,7 @@ def _make_task_id( # It makes the workflow resistant to retries or variations on the # same task name (see #11). arguments = json_dumps({"args": args, "kwargs": kwargs}) - suffix = hashlib.md5(arguments.encode("utf-8")).hexdigest() # nosec + suffix = hashlib.md5(arguments.encode()).hexdigest() # nosec if isinstance(a_task, WorkflowTask): # Some task types must have globally unique names. @@ -231,7 +231,7 @@ def _make_task_id( task_id = f"{a_task.name}-{suffix}" if len(task_id) > 256: # Better safe than sorry... - task_id = task_id[0:223] + "-" + hashlib.md5(task_id.encode("utf-8")).hexdigest() # nosec + task_id = task_id[0:223] + "-" + hashlib.md5(task_id.encode()).hexdigest() # nosec return task_id def _get_future_from_activity_event(self, event: dict[str, Any]) -> futures.Future | None: @@ -248,16 +248,7 @@ def _get_future_from_activity_event(self, event: dict[str, Any]) -> futures.Futu name = event["activity_type"]["name"] version = event["activity_type"]["version"] if event["cause"] == "ACTIVITY_TYPE_DOES_NOT_EXIST" and (name, version) not in self.created_activity_types: - self.created_activity_types.add((name, version)) - activity_type = simpleflow.swf.mapper.models.ActivityType(self.domain, name=name, version=version) - logger.info(f"creating activity type {activity_type.name} in domain {self.domain.name}") - try: - activity_type.save() - except simpleflow.swf.mapper.exceptions.AlreadyExistsError: - logger.info( - f"oops: Activity type {activity_type.name} in domain {self.domain.name} already exists," - f" creation failed, continuing..." - ) + self.create_activity_type(name, version) return None logger.info(f"failed to schedule {name}: {event['cause']}") return None @@ -283,6 +274,18 @@ def _get_future_from_activity_event(self, event: dict[str, Any]) -> futures.Futu return future + def create_activity_type(self, name: str, version: str) -> None: + self.created_activity_types.add((name, version)) + activity_type = simpleflow.swf.mapper.models.ActivityType(self.domain, name=name, version=version) + logger.info(f"creating activity type {activity_type.name} in domain {self.domain.name}") + try: + activity_type.save() + except simpleflow.swf.mapper.exceptions.AlreadyExistsError: + logger.info( + f"oops: Activity type {activity_type.name} in domain {self.domain.name} already exists," + f" creation failed, continuing..." + ) + def _get_future_from_child_workflow_event(self, event: dict[str, Any]) -> futures.Future | None: """Maps a child workflow event to a Future with the corresponding state. @@ -797,7 +800,7 @@ def resume( # ... but only keep the event if the task was successful if former_event and former_event["state"] == "completed": logger.info(f"faking task completed successfully in previous workflow: {former_event['id']}") - json_hash = hashlib.md5(json_dumps(former_event).encode("utf-8")).hexdigest() # nosec + json_hash = hashlib.md5(json_dumps(former_event).encode()).hexdigest() # nosec fake_task_list = "FAKE-" + json_hash # schedule task on a fake task list diff --git a/simpleflow/swf/helpers.py b/simpleflow/swf/helpers.py index 09227ddaa..be5ac17df 100644 --- a/simpleflow/swf/helpers.py +++ b/simpleflow/swf/helpers.py @@ -5,6 +5,7 @@ import json import os import socket +import sys from typing import TYPE_CHECKING import psutil @@ -29,11 +30,12 @@ ] -def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution: +def get_workflow_execution(domain_name: str, workflow_id: str, run_id: str | None = None) -> WorkflowExecution | None: def filter_execution(*args, **kwargs): if "workflow_status" in kwargs: kwargs["status"] = kwargs.pop("workflow_status") - return query.filter(*args, **kwargs)[0] + filtered_executions = query.filter(*args, **kwargs) + return filtered_executions[0] if filtered_executions else None domain = simpleflow.swf.mapper.models.Domain(domain_name) query = simpleflow.swf.mapper.querysets.WorkflowExecutionQuerySet(domain) @@ -61,6 +63,9 @@ def show_workflow_info(domain_name, workflow_id, run_id=None): workflow_id, run_id, ) + if not workflow_execution: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) return pretty.info(workflow_execution) @@ -70,6 +75,9 @@ def show_workflow_profile(domain_name, workflow_id, run_id=None, nb_tasks=None): workflow_id, run_id, ) + if not workflow_execution: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) return pretty.profile(workflow_execution, nb_tasks) @@ -79,6 +87,9 @@ def show_workflow_status(domain_name: str, workflow_id: str, run_id: str | None workflow_id, run_id, ) + if not workflow_execution: + print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found") + sys.exit(1) return pretty.status(workflow_execution, nb_tasks) @@ -171,6 +182,9 @@ def get_task( domain_name, workflow_id, ) + if not workflow_execution: + print(f"Workflow {workflow_id} not found") + sys.exit(1) return pretty.get_task(workflow_execution, task_id, details) diff --git a/simpleflow/swf/mapper/models/base.py b/simpleflow/swf/mapper/models/base.py index 52a9599ec..f44fa47b6 100644 --- a/simpleflow/swf/mapper/models/base.py +++ b/simpleflow/swf/mapper/models/base.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections import OrderedDict, namedtuple +from collections import namedtuple from simpleflow.swf.mapper.core import ConnectedSWFObject from simpleflow.swf.mapper.exceptions import DoesNotExistError @@ -13,15 +13,13 @@ class ModelDiff: :param input: triples (tuples) storing in order: compared attribute name, local model attribute value, upstream model attribute value. - :type input: *args :param ignore_fields: list of fields to ignore when comparing local and upstream - :type ignore_fields: list """ - def __init__(self, *input, ignore_fields: list[str] | None = None): + def __init__(self, *input: tuple[str, str, str], ignore_fields: list[str] | None = None): self.ignore_fields = ignore_fields or [] - self.container: OrderedDict[str, tuple[str, str]] = self._process_input(input) + self.container: dict[str, tuple[str, str]] = self._process_input(input) def __contains__(self, attr): return attr in self.container @@ -33,14 +31,14 @@ def __getitem__(self, index): attr, (local, upstream) = list(self.container.items())[index] return Difference(attr, local, upstream) - def _process_input(self, input): - return OrderedDict( - (attr, (local, upstream)) + def _process_input(self, input: tuple[tuple[str, str, str], ...]) -> dict[str, tuple[str, str]]: + return { + attr: (local, upstream) for attr, local, upstream in input if local != upstream and attr not in self.ignore_fields - ) + } - def add_input(self, *input): + def add_input(self, *input: tuple[str, str, str]): """Adds input differing data into ModelDiff instance""" self.container.update(self._process_input(input)) diff --git a/simpleflow/swf/mapper/models/event/base.py b/simpleflow/swf/mapper/models/event/base.py index 186d41f9b..47e4e9317 100644 --- a/simpleflow/swf/mapper/models/event/base.py +++ b/simpleflow/swf/mapper/models/event/base.py @@ -42,6 +42,7 @@ class Event: :param raw_data: raw_event representation provided by amazon service """ + # These class attributes manipulate instance ones _type: str | None = None _name: str | None = None _attributes_key: str | None = None diff --git a/simpleflow/swf/mapper/models/event/workflow.py b/simpleflow/swf/mapper/models/event/workflow.py index aa8c73eb5..c4a9d8a35 100644 --- a/simpleflow/swf/mapper/models/event/workflow.py +++ b/simpleflow/swf/mapper/models/event/workflow.py @@ -24,10 +24,44 @@ class WorkflowExecution(TypedDict): class WorkflowExecutionEvent(Event): _type = "WorkflowExecution" + # start initiated_event_id: int + child_policy: str + task_list: TaskList + workflow_type: WorkflowType + continued_execution_run_id: str | None + execution_start_to_close_timeout: str | None + input: str | None + lambda_role: str | None + parent_initiated_event_id: int | None + parent_workflow_execution: WorkflowExecution | None + tag_list: list[str] | None + task_priority: str | None + task_start_to_close_timeout: str | None + + # continued_as_new + new_execution_run_id: str + signal_name: str + decision_task_completed_event_id: int + # completed + result: str | None + + # terminated + # child_policy:str + cause: str | None + details: str | None + reason: str | None + + # timed out + # child_policy:str + timeout_type: str | None + + # workflow_execution: WorkflowExecution + # close_status: str + class CompiledWorkflowExecutionEvent(CompiledEvent): _type = "WorkflowExecution" diff --git a/simpleflow/swf/mapper/models/workflow.py b/simpleflow/swf/mapper/models/workflow.py index 4c3d496bd..edff33d88 100644 --- a/simpleflow/swf/mapper/models/workflow.py +++ b/simpleflow/swf/mapper/models/workflow.py @@ -251,7 +251,7 @@ def start_execution( :param decision_tasks_timeout: maximum duration of decision tasks for this workflow execution """ - workflow_id = workflow_id or f"{self.name}-{self.version}-{time.time():d}" + workflow_id = workflow_id or f"{self.name}-{self.version}-{int(time.time())}" task_list = task_list or self.task_list child_policy = child_policy or self.child_policy if child_policy not in CHILD_POLICIES: diff --git a/simpleflow/swf/process/decider/helpers.py b/simpleflow/swf/process/decider/helpers.py index 62576a451..1769e504d 100644 --- a/simpleflow/swf/process/decider/helpers.py +++ b/simpleflow/swf/process/decider/helpers.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import simpleflow.swf.mapper.models from simpleflow import logger from simpleflow.swf.executor import Executor @@ -7,35 +9,21 @@ from .base import Decider, DeciderPoller +if TYPE_CHECKING: + from simpleflow.history import History + def load_workflow_executor( - domain, - workflow_name, - task_list=None, - repair_with=None, - force_activities=None, - repair_workflow_id=None, - repair_run_id=None, -): + domain: simpleflow.swf.mapper.models.Domain, + workflow_name: str, + task_list: str | None = None, + repair_with: History | None = None, + force_activities: str | None = None, + repair_workflow_id: str | None = None, + repair_run_id: str | None = None, +) -> Executor: """ Load a workflow executor. - - :param domain: - :type domain: simpleflow.swf.mapper.models.Domain - :param workflow_name: - :type workflow_name: str - :param task_list: - :type task_list: Optional[str] - :param repair_with: - :type repair_with: Optional[simpleflow.history.History] - :param force_activities: - :type force_activities: Optional[str] - :param repair_workflow_id: workflow ID to repair - :type repair_workflow_id: Optional[str] - :param repair_run_id: run ID to repair - :type repair_run_id: Optional[str] - :return: Executor for this workflow - :rtype: Executor """ logger.debug(f'load_workflow_executor(workflow_name="{workflow_name}")') workflow = import_from_module(workflow_name) @@ -55,43 +43,27 @@ def load_workflow_executor( def make_decider_poller( - workflows, - domain, - task_list, - repair_with=None, - force_activities=None, - is_standalone=False, - repair_workflow_id=None, - repair_run_id=None, -): + workflows: list[str], + domain_name: str, + task_list: str, + repair_with: History | None = None, + force_activities: str | None = None, + is_standalone: bool = False, + repair_workflow_id: str | None = None, + repair_run_id: str | None = None, +) -> DeciderPoller: """ Factory building a decider poller. - :param workflows: - :type workflows: - :param domain: - :type domain: - :param task_list: - :type task_list: - :param repair_with: - :type repair_with: Optional[simpleflow.history.History] - :param force_activities: - :type force_activities: Optional[str] - :param is_standalone: Whether the executor use this task list (and pass it to the workers) - :type is_standalone: bool - :param repair_workflow_id: workflow ID to repair - :type repair_workflow_id: Optional[str] - :param repair_run_id: run ID to repair - :type repair_run_id: Optional[str] - :return: - :rtype: DeciderPoller """ if repair_with and len(workflows) != 1: - # too complicated ; I even wonder why passing multiple workflows here is + # too complicated; I even wonder why passing multiple workflows here is # useful, a domain+task_list is typically handled in a single workflow # definition, seems like good practice (?) raise ValueError("Sorry you can't repair more than 1 workflow at once!") - domain = simpleflow.swf.mapper.models.Domain(domain) + domain = simpleflow.swf.mapper.models.Domain(domain_name) + if not domain.exists: + domain.save() executors = [ load_workflow_executor( domain, @@ -108,38 +80,23 @@ def make_decider_poller( def make_decider( - workflows, - domain, - task_list, - nb_children=None, - repair_with=None, - force_activities=None, - is_standalone=False, - repair_workflow_id=None, - repair_run_id=None, -): + workflows: list[str], + domain: str, + task_list: str, + nb_children: int | None = None, + repair_with: History | None = None, + force_activities: str | None = None, + is_standalone: bool = False, + repair_workflow_id: str | None = None, + repair_run_id: str | None = None, +) -> Decider: """ Instantiate a Decider. - :param workflows: - :type workflows: list[str] - :param domain: - :type domain: str - :param task_list: - :type task_list: str - :param nb_children: - :type nb_children: Optional[int] - :param repair_with: previous history - :type repair_with: Optional[simpleflow.history.History] - :param force_activities: Regex matching the activities to force - :type force_activities: Optional[str] - :param is_standalone: Whether the executor use this task list (and pass it to the workers) - :type is_standalone: bool - :param repair_workflow_id: workflow ID to repair - :type repair_workflow_id: Optional[str] - :param repair_run_id: run ID to repair - :type repair_run_id: Optional[str] - :return: - :rtype: Decider + repair_with: previous history + force_activities: Regex matching the activities to force + is_standalone: Whether the executor uses this task list (and pass it to the workers) + repair_workflow_id: workflow ID to repair + repair_run_id: run ID to repair """ poller = make_decider_poller( workflows, diff --git a/simpleflow/swf/stats/pretty.py b/simpleflow/swf/stats/pretty.py index c5b753a4a..6e619afdd 100644 --- a/simpleflow/swf/stats/pretty.py +++ b/simpleflow/swf/stats/pretty.py @@ -238,7 +238,6 @@ def list_details( "Workflow Version", "Run ID", "Status", - "Task List", "Child Policy", "Close Status", "Start Timestamp", @@ -248,6 +247,9 @@ def list_details( "Input", "Tags", "Decision Tasks Timeout", + "Parent Workflow ID", + "Parent Run ID", + "Cancel Requested", ) rows = [ ( @@ -256,7 +258,6 @@ def list_details( execution.workflow_type.version, execution.run_id, execution.status, - execution.task_list, execution.child_policy, execution.close_status, execution.start_timestamp, @@ -266,6 +267,9 @@ def list_details( execution.input, execution.tag_list, execution.decision_tasks_timeout, + execution.parent.get("workflowId"), + execution.parent.get("runId"), + execution.cancel_requested, ) for execution in workflow_executions ]