From 76996b1c0e1f7fa191b20be53fa47719185a9f42 Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Tue, 18 Mar 2025 10:06:52 +0530 Subject: [PATCH 1/6] feat: add WES models with unit tests --- crategen/models/__init__.py | 17 ++ crategen/models/wes_models.py | 234 ++++++++++++++++++++++++++++ tests/unit/test_wes_models.py | 281 ++++++++++++++++++++++++++++++++++ 3 files changed, 532 insertions(+) create mode 100644 crategen/models/wes_models.py create mode 100644 tests/unit/test_wes_models.py diff --git a/crategen/models/__init__.py b/crategen/models/__init__.py index 2da85d7..a35d3aa 100644 --- a/crategen/models/__init__.py +++ b/crategen/models/__init__.py @@ -17,8 +17,17 @@ TESState, TESTaskLog, ) +from .wes_models import ( + State, + WESOutputs, + Log, + TaskLog, + RunRequest, + WESData, +) __all__ = [ + # TES Models "TESData", "TESInput", "TESOutput", @@ -29,4 +38,12 @@ "TESOutputFileLog", "TESFileType", "TESState", + + # WES Models + "State", + "WESOutputs", + "Log", + "TaskLog", + "RunRequest", + "WESData", ] diff --git a/crategen/models/wes_models.py b/crategen/models/wes_models.py new file mode 100644 index 0000000..b4b465f --- /dev/null +++ b/crategen/models/wes_models.py @@ -0,0 +1,234 @@ +""" +Each model in this module conforms to the corresponding WES model names as specified by the GA4GH schema (https://ga4gh.github.io/workflow-execution-service-schemas/docs/). + +This module provides Pydantic models for the Workflow Execution Service (WES) schema, +supporting validation, serialization, and deserialization of WES data structures. +""" +from datetime import datetime +from enum import Enum +from typing import List, Optional, Union + +from pydantic import BaseModel, Field, root_validator, validator + +from crategen.converters.utils import convert_to_iso8601 + + +class State(str, Enum): + """Enumeration of workflow states in the Workflow Execution Service (WES). + + These states represent the different stages a workflow can be in during its lifecycle. + + **Attributes:** + + - **UNKNOWN**: The state of the workflow is unknown. + - **QUEUED**: The workflow is queued. + - **INITIALIZING**: The workflow is initializing. + - **RUNNING**: The workflow is running. + - **PAUSED**: The workflow is paused. + - **COMPLETE**: The workflow has completed successfully. + - **EXECUTOR_ERROR**: The workflow encountered an executor error. + - **SYSTEM_ERROR**: The workflow encountered a system error. + - **CANCELLED**: The workflow was cancelled by the user. + - **CANCELING**: The workflow is in the process of being cancelled. + - **PREEMPTED**: The workflow was preempted by the system. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/state_enum + """ + UNKNOWN = "UNKNOWN" + QUEUED = "QUEUED" + INITIALIZING = "INITIALIZING" + RUNNING = "RUNNING" + PAUSED = "PAUSED" + COMPLETE = "COMPLETE" + EXECUTOR_ERROR = "EXECUTOR_ERROR" + SYSTEM_ERROR = "SYSTEM_ERROR" + CANCELLED = "CANCELLED" + CANCELING = "CANCELING" + PREEMPTED = "PREEMPTED" + + +class WESOutputs(BaseModel): + """Represents an output file or directory from a workflow run. + + **Attributes:** + + - **location** (`str`): The location (URL or path) where the output is stored. + - **name** (`str`): The name of the output file or directory. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/outputs_model + """ + location: str + name: str + + +class Log(BaseModel): + """ + Represents a run log in the Workflow Execution Service (WES). + + **Attributes:** + + - **name** (`Optional[str]`): The task or workflow name. + - **cmd** (`Optional[list[str]]`): The command line that was executed. + - **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format. + - **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format. + - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. + - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. + - **exit_code** (`Optional[int]`): The exit code of the program. + - **system_logs** (`optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model + """ + + name: Optional[str] = None + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + cmd: Optional[List[str]] = None + stdout: Optional[str] = None + stderr: Optional[str] = None + exit_code: Optional[int] = None + system_logs: Optional[List[str]] = None + + @validator("start_time", "end_time") + def validate_datetime(cls, value): + """Validate and convert datetime values to RFC3339/ISO8601 format. + + This validator handles both datetime objects and string representations, + converting them to a consistent ISO8601 format with UTC timezone (Z suffix). + + Args: + value (Union[datetime, str, None]): The datetime value to validate and convert + + Returns: + Optional[str]: The formatted datetime string, or None if input was None + """ + # Handle both string and datetime objects + if value is None: + return None + # If it's already a datetime object, convert it to ISO format + if isinstance(value, datetime): + return value.isoformat() + "Z" + # Otherwise, use the utility function for string conversion + return convert_to_iso8601(value) + + +class TaskLog(Log): + """ + Represents a task log in the Workflow Execution Service (WES). + + **Attributes:** + + - **name** (`str`): The task or workflow name. + - **cmd** (`Optional[list[str]]`): The command line that was executed. + - **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format. + - **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format. + - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. + - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. + - **exit_code** (`Optional[int]`): The exit code of the program. + - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. + - **id** (`str`): A unique identifier which maybe used to reference the task. + - **tes_uri** (`Optional[str]`): An optional URL pointing to an extended task definition defined by a TES api. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model + """ + + id: str + tes_uri: Optional[str] = None + name: str = Field(...) + + +class RunRequest(BaseModel): + """ + Represents a workflow request in WES. + + **Attributes:** + + - **workflow_params** (`Optional[dict[str, str]]`): The workflow run parameterizations (JSON encoded), + including input and output file locations. + - **workflow_type** (`str`): The workflow descriptor type. + - **workflow_type_version** (`str`): The workflow descriptor type version. + - **tags** (`Optional[dict[str, str]]`): Additional tags associated with the workflow. + - **workflow_engine_parameters** (Optional[dict[str, str]]): Input values specific to the workflow engine. + - **workflow_engine** (`Optional[str]`): The workflow engine. + - **workflow_engine_version** (`Optional[str]`): The workflow engine version. + - **workflow_url** (`str`): The workflow url. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runrequest_model + """ + + workflow_params: dict[str, str] + workflow_type: str + workflow_type_version: str + tags: Optional[dict[str, str]] = {} + workflow_engine_parameters: Optional[dict[str, str]] = None + workflow_engine: Optional[str] = None + workflow_engine_version: Optional[str] = None + workflow_url: str + + @root_validator() + def validate_workflow_engine(cls, values): + """Validate workflow engine dependencies. + + If workflow_engine_version is set, then workflow_engine must also be set. + + Args: + values (dict): The model field values + + Returns: + dict: The validated field values + + Raises: + ValueError: If workflow_engine_version is set but workflow_engine is not + """ + engine_version = values.get("workflow_engine_version") + engine = values.get("workflow_engine") + if engine_version is not None and engine is None: + raise ValueError( + "The 'workflow_engine' attribute is required when the 'workflow_engine_version' attribute is set" + ) + return values + + +class WESData(BaseModel): + """ + Represents a WES run. + + **Attributes:** + + - **run_id** (`str`): The unique identifier for the WES run. + - **request** (`Optional[RunRequest]`): The request associated with the WES run. + - **state** (`Optional[State]`): The state of the WES run. + - **run_log** (`Optional[Log]`): The log of the WES run. + - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used + to obtain a paginated list of task logs for this workflow. + - **task_logs** (`Optional[list[Log | TaskLog] | None]`): The logs of individual tasks within the run. + This attribute is deprecated. + - **outputs** (`dict[str, str]`): The outputs of the WES run. + + **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/run_model + """ + + run_id: str + request: Optional[RunRequest] = None + state: Optional[State] = None + run_log: Optional[Log] = None + task_logs_url: Optional[str] = None + task_logs: Optional[List[Union[Log, TaskLog]]] = None + outputs: dict[str, str] = {} + + @root_validator + def check_deprecated_fields(cls, values): + """Check for usage of deprecated fields and issue warnings. + + Currently warns about task_logs field which is deprecated in favor of task_logs_url. + + Args: + values (dict): The model field values + + Returns: + dict: The field values unchanged + """ + if values.get("task_logs") is not None: + print( + "DeprecationWarning: The 'task_logs' field is deprecated and will be removed in future versions. Use 'task_logs_url' instead." + ) + return values diff --git a/tests/unit/test_wes_models.py b/tests/unit/test_wes_models.py new file mode 100644 index 0000000..6f96a64 --- /dev/null +++ b/tests/unit/test_wes_models.py @@ -0,0 +1,281 @@ +"""Tests for WES models""" + +import datetime +import io +import sys +import pytest +from unittest.mock import patch + +from crategen.models.wes_models import ( + State, + WESOutputs, + Log, + TaskLog, + RunRequest, + WESData +) + +# Test data constants +valid_datetime_strings = [ + "2020-10-02T16:00:00.000Z", + "2024-10-15T18:14:34+00:00", + "2024-10-15T18:14:34.948996+00:00", + "2024-10-15T19:01:06.872464+00:00", +] + +invalid_datetime_strings = [ + "2020-10-02 16:00:00", # Missing 'T' separator + "2020-10-02T16:00:00", # Missing timezone + "20201002T160000Z", # Missing separators + "2020-10-02T16:00:00.000+0200", # Invalid timezone format + "2020-10-02T16:00:00.000 GMT", # Invalid timezone format + "02-10-2020T16:00:00.000Z", # Incorrect date order +] + +test_workflow_url = "https://raw.githubusercontent.com/example/workflow.cwl" + +class TestState: + """Test suite for State enum""" + + def test_state_enum_values(self): + """Test that State enum has correct values""" + assert State.UNKNOWN == "UNKNOWN" + assert State.QUEUED == "QUEUED" + assert State.INITIALIZING == "INITIALIZING" + assert State.RUNNING == "RUNNING" + assert State.PAUSED == "PAUSED" + assert State.COMPLETE == "COMPLETE" + assert State.EXECUTOR_ERROR == "EXECUTOR_ERROR" + assert State.SYSTEM_ERROR == "SYSTEM_ERROR" + assert State.CANCELLED == "CANCELLED" + assert State.CANCELING == "CANCELING" + assert State.PREEMPTED == "PREEMPTED" + +class TestWESOutputs: + """Test suite for WESOutputs model""" + + def test_wes_outputs_creation(self): + """Test creating WESOutputs objects""" + # Test with minimal required fields + outputs = WESOutputs(location="/path/to/output", name="output_file") + assert outputs.location == "/path/to/output" + assert outputs.name == "output_file" + + # Test with edge case values + outputs = WESOutputs(location="s3://bucket/key", name="") + assert outputs.location == "s3://bucket/key" + assert outputs.name == "" + +class TestLog: + """Test suite for Log model""" + + def test_log_creation_with_minimal_fields(self): + """Test creating Log with minimal fields""" + log = Log() + assert log.name is None + assert log.cmd is None + assert log.start_time is None + assert log.end_time is None + assert log.stdout is None + assert log.stderr is None + assert log.exit_code is None + assert log.system_logs is None + + def test_log_creation_with_all_fields(self): + """Test creating Log with all fields""" + log = Log( + name="workflow_run", + cmd=["cwltool", "workflow.cwl", "inputs.json"], + start_time=datetime.datetime(2023, 1, 1, 12, 0, 0), + end_time=datetime.datetime(2023, 1, 1, 12, 30, 0), + stdout="https://example.com/stdout.log", + stderr="https://example.com/stderr.log", + exit_code=0, + system_logs=["Starting job", "Job completed"] + ) + assert log.name == "workflow_run" + assert log.cmd == ["cwltool", "workflow.cwl", "inputs.json"] + assert log.stdout == "https://example.com/stdout.log" + assert log.stderr == "https://example.com/stderr.log" + assert log.exit_code == 0 + assert log.system_logs == ["Starting job", "Job completed"] + # Datetime fields are validated and converted to RFC3339 + +class TestTaskLog: + """Test suite for TaskLog model""" + + def test_task_log_creation(self): + """Test creating TaskLog objects""" + # Test with required fields + task_log = TaskLog( + id="task-001", + name="alignment_task", + ) + assert task_log.id == "task-001" + assert task_log.name == "alignment_task" + assert task_log.tes_uri is None + + # Test with all fields + task_log = TaskLog( + id="task-002", + name="variant_calling", + tes_uri="https://tes-service.org/tasks/task-002", + cmd=["samtools", "mpileup", "-f", "ref.fa", "input.bam"], + exit_code=0 + ) + assert task_log.id == "task-002" + assert task_log.name == "variant_calling" + assert task_log.tes_uri == "https://tes-service.org/tasks/task-002" + assert task_log.cmd == ["samtools", "mpileup", "-f", "ref.fa", "input.bam"] + assert task_log.exit_code == 0 + + def test_task_log_name_required(self): + """Test that name is required for TaskLog""" + with pytest.raises(ValueError): + TaskLog(id="task-001") + +class TestRunRequest: + """Test suite for RunRequest model""" + + def test_run_request_minimal(self): + """Test creating RunRequest with minimal fields""" + request = RunRequest( + workflow_params={"input": "input.txt", "output": "output.txt"}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url="https://example.com/workflow.cwl" + ) + assert request.workflow_params == {"input": "input.txt", "output": "output.txt"} + assert request.workflow_type == "CWL" + assert request.workflow_type_version == "v1.0" + assert request.workflow_url == "https://example.com/workflow.cwl" + assert request.tags == {} + assert request.workflow_engine_parameters is None + assert request.workflow_engine is None + assert request.workflow_engine_version is None + + def test_run_request_all_fields(self): + """Test creating RunRequest with all fields""" + request = RunRequest( + workflow_params={"input": "input.txt", "output": "output.txt"}, + workflow_type="WDL", + workflow_type_version="1.0", + workflow_url="https://example.com/workflow.wdl", + tags={"project": "genomics", "priority": "high"}, + workflow_engine_parameters={"memory": "16G"}, + workflow_engine="cromwell", + workflow_engine_version="52.0.1" + ) + assert request.workflow_params == {"input": "input.txt", "output": "output.txt"} + assert request.workflow_type == "WDL" + assert request.workflow_type_version == "1.0" + assert request.workflow_url == "https://example.com/workflow.wdl" + assert request.tags == {"project": "genomics", "priority": "high"} + assert request.workflow_engine_parameters == {"memory": "16G"} + assert request.workflow_engine == "cromwell" + assert request.workflow_engine_version == "52.0.1" + + def test_workflow_engine_validation(self): + """Test that workflow_engine is required when workflow_engine_version is set""" + # Valid: No version specified + request = RunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url="https://example.com/workflow.cwl" + ) + assert request.workflow_engine is None + assert request.workflow_engine_version is None + + # Valid: Both engine and version specified + request = RunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url="https://example.com/workflow.cwl", + workflow_engine="cwltool", + workflow_engine_version="3.1.20220502201013" + ) + assert request.workflow_engine == "cwltool" + assert request.workflow_engine_version == "3.1.20220502201013" + + # Invalid: Version without engine + with pytest.raises(ValueError) as exc_info: + RunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url="https://example.com/workflow.cwl", + workflow_engine_version="3.1.20220502201013" + ) + assert "workflow_engine" in str(exc_info.value) + +class TestWESData: + """Test suite for WESData model""" + + def test_wes_data_minimal(self): + """Test creating WESData with minimal fields""" + wes_data = WESData(run_id="run-001") + assert wes_data.run_id == "run-001" + assert wes_data.request is None + assert wes_data.state is None + assert wes_data.run_log is None + assert wes_data.task_logs_url is None + assert wes_data.task_logs is None + assert wes_data.outputs == {} + + def test_wes_data_complete(self): + """Test creating WESData with all fields""" + request = RunRequest( + workflow_params={"input": "input.txt"}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url="https://example.com/workflow.cwl" + ) + run_log = Log( + name="workflow_run", + cmd=["cwltool", "workflow.cwl", "inputs.json"], + exit_code=0 + ) + task_log = TaskLog( + id="task-001", + name="data_processing" + ) + + wes_data = WESData( + run_id="run-001", + request=request, + state=State.COMPLETE, + run_log=run_log, + task_logs_url="https://example.com/tasks", + task_logs=[task_log], + outputs={"result": "https://example.com/results/run-001.txt"} + ) + + assert wes_data.run_id == "run-001" + assert wes_data.request == request + assert wes_data.state == State.COMPLETE + assert wes_data.run_log == run_log + assert wes_data.task_logs_url == "https://example.com/tasks" + assert wes_data.task_logs == [task_log] + assert wes_data.outputs == {"result": "https://example.com/results/run-001.txt"} + + def test_task_logs_deprecation_warning(self): + """Test deprecation warning when task_logs is used""" + task_log = TaskLog(id="task-001", name="test") + + # Capture stdout to test the deprecation warning + captured_output = io.StringIO() + sys.stdout = captured_output + + WESData( + run_id="run-001", + task_logs=[task_log] + ) + + sys.stdout = sys.__stdout__ + output = captured_output.getvalue() + + assert "DeprecationWarning" in output + assert "task_logs" in output + assert "task_logs_url" in output # Fixed: was 'tes_logs_url', should be 'task_logs_url' \ No newline at end of file From ee1ff81b6001b9d41f8d465b112fa1ff4a1eb2c5 Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Tue, 18 Mar 2025 11:05:04 +0530 Subject: [PATCH 2/6] feat: add WES models with unit tests --- crategen/models/__init__.py | 17 ++ crategen/models/wes_models.py | 59 +++++++ tests/unit/test_wes_models.py | 308 +++++++++++++++++++++++++++++++++- 3 files changed, 383 insertions(+), 1 deletion(-) diff --git a/crategen/models/__init__.py b/crategen/models/__init__.py index a35d3aa..ada133b 100644 --- a/crategen/models/__init__.py +++ b/crategen/models/__init__.py @@ -18,12 +18,21 @@ TESTaskLog, ) from .wes_models import ( +<<<<<<< HEAD State, WESOutputs, Log, TaskLog, RunRequest, WESData, +======= + WESData, + WESLog, + WESOutputs, + WESRunRequest, + WESState, + WESTaskLog, +>>>>>>> e2e7014 (feat: add WES models with unit tests) ) __all__ = [ @@ -40,10 +49,18 @@ "TESState", # WES Models +<<<<<<< HEAD "State", "WESOutputs", "Log", "TaskLog", "RunRequest", +======= + "WESState", + "WESOutputs", + "WESLog", + "WESTaskLog", + "WESRunRequest", +>>>>>>> e2e7014 (feat: add WES models with unit tests) "WESData", ] diff --git a/crategen/models/wes_models.py b/crategen/models/wes_models.py index b4b465f..15f3284 100644 --- a/crategen/models/wes_models.py +++ b/crategen/models/wes_models.py @@ -4,16 +4,26 @@ This module provides Pydantic models for the Workflow Execution Service (WES) schema, supporting validation, serialization, and deserialization of WES data structures. """ +<<<<<<< HEAD from datetime import datetime +======= +>>>>>>> e2e7014 (feat: add WES models with unit tests) from enum import Enum from typing import List, Optional, Union from pydantic import BaseModel, Field, root_validator, validator +<<<<<<< HEAD from crategen.converters.utils import convert_to_iso8601 class State(str, Enum): +======= +from rfc3339_validator import validate_rfc3339 # type: ignore + + +class WESState(str, Enum): +>>>>>>> e2e7014 (feat: add WES models with unit tests) """Enumeration of workflow states in the Workflow Execution Service (WES). These states represent the different stages a workflow can be in during its lifecycle. @@ -61,7 +71,11 @@ class WESOutputs(BaseModel): name: str +<<<<<<< HEAD class Log(BaseModel): +======= +class WESLog(BaseModel): +>>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a run log in the Workflow Execution Service (WES). @@ -69,8 +83,13 @@ class Log(BaseModel): - **name** (`Optional[str]`): The task or workflow name. - **cmd** (`Optional[list[str]]`): The command line that was executed. +<<<<<<< HEAD - **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format. - **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format. +======= + - **start_time** (`Optional[str]`): When the command started executing, in RFC 3339 format. + - **end_time** (`Optional[str]`): When the command stopped executing, in RFC 3339 format. +>>>>>>> e2e7014 (feat: add WES models with unit tests) - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - **exit_code** (`Optional[int]`): The exit code of the program. @@ -80,14 +99,20 @@ class Log(BaseModel): """ name: Optional[str] = None +<<<<<<< HEAD start_time: Optional[datetime] = None end_time: Optional[datetime] = None +======= + start_time: Optional[str] = None + end_time: Optional[str] = None +>>>>>>> e2e7014 (feat: add WES models with unit tests) cmd: Optional[List[str]] = None stdout: Optional[str] = None stderr: Optional[str] = None exit_code: Optional[int] = None system_logs: Optional[List[str]] = None +<<<<<<< HEAD @validator("start_time", "end_time") def validate_datetime(cls, value): """Validate and convert datetime values to RFC3339/ISO8601 format. @@ -112,6 +137,19 @@ def validate_datetime(cls, value): class TaskLog(Log): +======= + @validator("start_time", "end_time", allow_reuse=True) + def validate_datetime(cls, value, field): + """Check correct datetime format is RFC 3339""" + if value and not validate_rfc3339(value): + raise ValueError( + f"The '{field.name}' property must be in the RFC 3339 format" + ) + return value + + +class WESTaskLog(WESLog): +>>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a task log in the Workflow Execution Service (WES). @@ -136,7 +174,11 @@ class TaskLog(Log): name: str = Field(...) +<<<<<<< HEAD class RunRequest(BaseModel): +======= +class WESRunRequest(BaseModel): +>>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a workflow request in WES. @@ -195,12 +237,21 @@ class WESData(BaseModel): **Attributes:** - **run_id** (`str`): The unique identifier for the WES run. +<<<<<<< HEAD - **request** (`Optional[RunRequest]`): The request associated with the WES run. - **state** (`Optional[State]`): The state of the WES run. - **run_log** (`Optional[Log]`): The log of the WES run. - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used to obtain a paginated list of task logs for this workflow. - **task_logs** (`Optional[list[Log | TaskLog] | None]`): The logs of individual tasks within the run. +======= + - **request** (`Optional[WESRunRequest]`): The request associated with the WES run. + - **state** (`Optional[WESState]`): The state of the WES run. + - **run_log** (`Optional[WESLog]`): The log of the WES run. + - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used + to obtain a paginated list of task logs for this workflow. + - **task_logs** (`Optional[list[WESLog | WESTaskLog] | None]`): The logs of individual tasks within the run. +>>>>>>> e2e7014 (feat: add WES models with unit tests) This attribute is deprecated. - **outputs** (`dict[str, str]`): The outputs of the WES run. @@ -208,11 +259,19 @@ class WESData(BaseModel): """ run_id: str +<<<<<<< HEAD request: Optional[RunRequest] = None state: Optional[State] = None run_log: Optional[Log] = None task_logs_url: Optional[str] = None task_logs: Optional[List[Union[Log, TaskLog]]] = None +======= + request: Optional[WESRunRequest] = None + state: Optional[WESState] = None + run_log: Optional[WESLog] = None + task_logs_url: Optional[str] = None + task_logs: Optional[List[Union[WESLog, WESTaskLog]]] = None +>>>>>>> e2e7014 (feat: add WES models with unit tests) outputs: dict[str, str] = {} @root_validator diff --git a/tests/unit/test_wes_models.py b/tests/unit/test_wes_models.py index 6f96a64..806059f 100644 --- a/tests/unit/test_wes_models.py +++ b/tests/unit/test_wes_models.py @@ -1,4 +1,5 @@ """Tests for WES models""" +<<<<<<< HEAD import datetime import io @@ -13,6 +14,20 @@ TaskLog, RunRequest, WESData +======= +import io +import sys + +import pytest + +from crategen.models.wes_models import ( + WESData, + WESLog, + WESOutputs, + WESRunRequest, + WESState, + WESTaskLog, +>>>>>>> e2e7014 (feat: add WES models with unit tests) ) # Test data constants @@ -32,6 +47,7 @@ "02-10-2020T16:00:00.000Z", # Incorrect date order ] +<<<<<<< HEAD test_workflow_url = "https://raw.githubusercontent.com/example/workflow.cwl" class TestState: @@ -50,12 +66,56 @@ def test_state_enum_values(self): assert State.CANCELLED == "CANCELLED" assert State.CANCELING == "CANCELING" assert State.PREEMPTED == "PREEMPTED" +======= +test_url = "https://raw.githubusercontent.com/elixir-cloud-aai/CrateGen/refs/heads/main/README.md" + +MAX_SYSTEM_LOGS = 2 + +# Test data samples +test_workflow_params = { + "reads": f"{test_url}/reads.fastq", + "reference": f"{test_url}/reference.fa", + "output_dir": f"{test_url}/results/" +} + +test_workflow_engine_params = { + "memory": "16GB", + "cpu": "4", + "disk_size": "100GB" +} + +test_tags = { + "project": "genomics-pipeline", + "sample": "TCGA-AB-2823", + "analysis": "variant-calling" +} + + +class TestWESState: + """Test suite for WESState enum""" + + def test_state_enum_values(self): + """Test that WESState enum has correct values""" + assert WESState.UNKNOWN == "UNKNOWN" + assert WESState.QUEUED == "QUEUED" + assert WESState.INITIALIZING == "INITIALIZING" + assert WESState.RUNNING == "RUNNING" + assert WESState.PAUSED == "PAUSED" + assert WESState.COMPLETE == "COMPLETE" + assert WESState.EXECUTOR_ERROR == "EXECUTOR_ERROR" + assert WESState.SYSTEM_ERROR == "SYSTEM_ERROR" + assert WESState.CANCELLED == "CANCELLED" + assert WESState.CANCELING == "CANCELING" + assert WESState.PREEMPTED == "PREEMPTED" + +>>>>>>> e2e7014 (feat: add WES models with unit tests) class TestWESOutputs: """Test suite for WESOutputs model""" def test_wes_outputs_creation(self): """Test creating WESOutputs objects""" +<<<<<<< HEAD # Test with minimal required fields outputs = WESOutputs(location="/path/to/output", name="output_file") assert outputs.location == "/path/to/output" @@ -72,6 +132,31 @@ class TestLog: def test_log_creation_with_minimal_fields(self): """Test creating Log with minimal fields""" log = Log() +======= + # Test with real URLs + outputs = WESOutputs( + location="https://storage.googleapis.com/workflow-outputs/run123/result.bam", + name="alignment.bam" + ) + assert outputs.location.startswith("https://") + assert outputs.name == "alignment.bam" + + # Test with different output types + outputs = WESOutputs( + location="https://storage.googleapis.com/workflow-outputs/run123/variants.vcf", + name="variants.vcf" + ) + assert "variants.vcf" in outputs.location + assert outputs.name.endswith(".vcf") + + +class TestWESLog: + """Test suite for WESLog model""" + + def test_log_creation_with_minimal_fields(self): + """Test creating WESLog with minimal fields""" + log = WESLog() +>>>>>>> e2e7014 (feat: add WES models with unit tests) assert log.name is None assert log.cmd is None assert log.start_time is None @@ -81,6 +166,7 @@ def test_log_creation_with_minimal_fields(self): assert log.exit_code is None assert log.system_logs is None +<<<<<<< HEAD def test_log_creation_with_all_fields(self): """Test creating Log with all fields""" log = Log( @@ -183,20 +269,149 @@ def test_workflow_engine_validation(self): workflow_type="CWL", workflow_type_version="v1.0", workflow_url="https://example.com/workflow.cwl" +======= + @pytest.mark.parametrize("valid_datetime", valid_datetime_strings) + def test_log_creation_with_all_fields(self, valid_datetime): + """Test creating WESLog with all fields""" + log = WESLog( + name="workflow_123", + cmd=["cwltool", "--non-strict", test_url, test_url], + start_time=valid_datetime, + end_time=valid_datetime, + stdout="https://storage.googleapis.com/workflow-logs/run123/stdout.log", + stderr="https://storage.googleapis.com/workflow-logs/run123/stderr.log", + exit_code=0, + system_logs=[ + "Workflow engine initialized", + "Downloading CWL workflow from GitHub", + "Execution completed successfully" + ] + ) + assert log.name == "workflow_123" + assert test_url in log.cmd + assert log.stdout.startswith("https://") + assert log.stderr.startswith("https://") + assert log.exit_code == 0 + assert len(log.system_logs) == MAX_SYSTEM_LOGS + + @pytest.mark.parametrize("invalid_datetime", invalid_datetime_strings) + def test_log_datetime_validation(self, invalid_datetime): + """Test datetime validation in WESLog""" + with pytest.raises(ValueError) as exc_info: + WESLog( + name="workflow_123", + start_time=invalid_datetime + ) + assert "format" in str(exc_info.value) + + +class TestWESTaskLog: + """Test suite for WESTaskLog model""" + + def test_task_log_creation(self): + """Test creating WESTaskLog objects""" + # Test with required fields and real URLs + task_log = WESTaskLog( + id="task-bwa-mem-123", + name="bwa_mem_alignment", + cmd=["bwa", "mem", "-t", "4", "reference.fa", "reads.fastq"], + stdout="https://storage.googleapis.com/workflow-logs/task123/stdout.log", + stderr="https://storage.googleapis.com/workflow-logs/task123/stderr.log", + exit_code=0, + tes_uri=test_url + ) + assert task_log.id.startswith("task-") + assert task_log.name == "bwa_mem_alignment" + assert task_log.stdout.startswith("https://") + assert task_log.stderr.startswith("https://") + assert task_log.tes_uri == test_url + + def test_task_log_name_required(self): + """Test that name is required for WESTaskLog""" + with pytest.raises(ValueError): + WESTaskLog(id="task-123") + + +class TestWESRunRequest: + """Test suite for WESRunRequest model""" + + def test_run_request_minimal(self): + """Test creating WESRunRequest with minimal fields""" + request = WESRunRequest( + workflow_params={ + "input_reads": "https://storage.googleapis.com/sample-data/reads.fastq", + "output_path": "s3://my-bucket/results/" + }, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url + ) + assert request.workflow_type == "CWL" + assert request.workflow_url == test_url + assert request.workflow_params["input_reads"].startswith("https://") + assert request.workflow_params["output_path"].startswith("s3://") + + def test_run_request_all_fields(self): + """Test creating WESRunRequest with all fields""" + request = WESRunRequest( + workflow_params={ + "reads": "https://storage.googleapis.com/sample-data/tumor.bam", + "reference": "https://storage.googleapis.com/references/hg38.fa", + "output_dir": "s3://my-bucket/variant-calls/" + }, + workflow_type="WDL", + workflow_type_version="1.0", + workflow_url="https://raw.githubusercontent.com/broadinstitute/gatk-workflows/master/mutect2.wdl", + tags={ + "project": "cancer-genomics", + "sample": "TCGA-AB-2823", + "analysis": "somatic-variant-calling" + }, + workflow_engine_parameters={ + "memory": "16GB", + "cpu": "4", + "disk_size": "100GB" + }, + workflow_engine="cromwell", + workflow_engine_version="52.0.1" + ) + assert request.workflow_type == "WDL" + assert request.workflow_url.endswith(".wdl") + assert request.tags["project"] == "cancer-genomics" + assert request.workflow_engine == "cromwell" + + def test_workflow_engine_validation(self): + """Test workflow engine validation rules""" + # Valid: No version specified + request = WESRunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url +>>>>>>> e2e7014 (feat: add WES models with unit tests) ) assert request.workflow_engine is None assert request.workflow_engine_version is None # Valid: Both engine and version specified +<<<<<<< HEAD request = RunRequest( workflow_params={}, workflow_type="CWL", workflow_type_version="v1.0", workflow_url="https://example.com/workflow.cwl", +======= + request = WESRunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url, +>>>>>>> e2e7014 (feat: add WES models with unit tests) workflow_engine="cwltool", workflow_engine_version="3.1.20220502201013" ) assert request.workflow_engine == "cwltool" +<<<<<<< HEAD assert request.workflow_engine_version == "3.1.20220502201013" # Invalid: Version without engine @@ -206,17 +421,37 @@ def test_workflow_engine_validation(self): workflow_type="CWL", workflow_type_version="v1.0", workflow_url="https://example.com/workflow.cwl", +======= + assert request.workflow_engine_version.startswith("3.1") + + # Invalid: Version without engine + with pytest.raises(ValueError) as exc_info: + WESRunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url, +>>>>>>> e2e7014 (feat: add WES models with unit tests) workflow_engine_version="3.1.20220502201013" ) assert "workflow_engine" in str(exc_info.value) +<<<<<<< HEAD +======= + +>>>>>>> e2e7014 (feat: add WES models with unit tests) class TestWESData: """Test suite for WESData model""" def test_wes_data_minimal(self): """Test creating WESData with minimal fields""" +<<<<<<< HEAD wes_data = WESData(run_id="run-001") assert wes_data.run_id == "run-001" +======= + wes_data = WESData(run_id="wes-run-123") + assert wes_data.run_id.startswith("wes-") +>>>>>>> e2e7014 (feat: add WES models with unit tests) assert wes_data.request is None assert wes_data.state is None assert wes_data.run_log is None @@ -226,6 +461,7 @@ def test_wes_data_minimal(self): def test_wes_data_complete(self): """Test creating WESData with all fields""" +<<<<<<< HEAD request = RunRequest( workflow_params={"input": "input.txt"}, workflow_type="CWL", @@ -263,13 +499,79 @@ def test_wes_data_complete(self): def test_task_logs_deprecation_warning(self): """Test deprecation warning when task_logs is used""" task_log = TaskLog(id="task-001", name="test") +======= + request = WESRunRequest( + workflow_params={ + "reads": "https://storage.googleapis.com/sample-data/sample.fastq", + "output_dir": "s3://my-bucket/results/" + }, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url + ) + + run_log = WESLog( + name="variant_calling_workflow", + cmd=["cwltool", "--non-strict", test_url, test_url], + stdout="https://storage.googleapis.com/workflow-logs/run123/stdout.log", + stderr="https://storage.googleapis.com/workflow-logs/run123/stderr.log", + exit_code=0, + system_logs=["Workflow started", "Workflow completed"] + ) + + task_log = WESTaskLog( + id="task-123", + name="variant_calling", + cmd=["gatk", "HaplotypeCaller", "-R", "reference.fa", "-I", "input.bam"], + stdout="https://storage.googleapis.com/workflow-logs/task123/stdout.log", + stderr="https://storage.googleapis.com/workflow-logs/task123/stderr.log", + exit_code=0, + tes_uri=test_url + ) + + wes_data = WESData( + run_id="wes-run-123", + request=request, + state=WESState.COMPLETE, + run_log=run_log, + task_logs_url="https://storage.googleapis.com/workflow-logs/run123/tasks/", + task_logs=[task_log], + outputs={ + "aligned_bam": "https://storage.googleapis.com/workflow-outputs/run123/aligned.bam", + "variants_vcf": "https://storage.googleapis.com/workflow-outputs/run123/variants.vcf" + } + ) + + assert wes_data.run_id == "wes-run-123" + assert wes_data.request == request + assert wes_data.state == WESState.COMPLETE + assert wes_data.run_log == run_log + assert wes_data.task_logs_url.startswith("https://") + assert wes_data.task_logs == [task_log] + assert all(url.startswith("https://") for url in wes_data.outputs.values()) + + def test_task_logs_deprecation_warning(self): + """Test deprecation warning when task_logs is used""" + task_log = WESTaskLog( + id="task-123", + name="alignment", + cmd=["bwa", "mem", "ref.fa", "reads.fastq"], + stdout="https://storage.googleapis.com/logs/task123/stdout.log", + stderr="https://storage.googleapis.com/logs/task123/stderr.log", + exit_code=0 + ) +>>>>>>> e2e7014 (feat: add WES models with unit tests) # Capture stdout to test the deprecation warning captured_output = io.StringIO() sys.stdout = captured_output WESData( +<<<<<<< HEAD run_id="run-001", +======= + run_id="wes-run-123", +>>>>>>> e2e7014 (feat: add WES models with unit tests) task_logs=[task_log] ) @@ -278,4 +580,8 @@ def test_task_logs_deprecation_warning(self): assert "DeprecationWarning" in output assert "task_logs" in output - assert "task_logs_url" in output # Fixed: was 'tes_logs_url', should be 'task_logs_url' \ No newline at end of file +<<<<<<< HEAD + assert "task_logs_url" in output # Fixed: was 'tes_logs_url', should be 'task_logs_url' +======= + assert "task_logs_url" in output +>>>>>>> e2e7014 (feat: add WES models with unit tests) From b860a7e3ee065d7e465939fa509320218b676aca Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Tue, 18 Mar 2025 11:10:10 +0530 Subject: [PATCH 3/6] feat: add WES models with unit tests --- crategen/models/__init__.py | 17 --- crategen/models/wes_models.py | 74 ----------- tests/unit/test_wes_models.py | 239 ---------------------------------- 3 files changed, 330 deletions(-) diff --git a/crategen/models/__init__.py b/crategen/models/__init__.py index ada133b..d39758f 100644 --- a/crategen/models/__init__.py +++ b/crategen/models/__init__.py @@ -18,21 +18,12 @@ TESTaskLog, ) from .wes_models import ( -<<<<<<< HEAD - State, - WESOutputs, - Log, - TaskLog, - RunRequest, - WESData, -======= WESData, WESLog, WESOutputs, WESRunRequest, WESState, WESTaskLog, ->>>>>>> e2e7014 (feat: add WES models with unit tests) ) __all__ = [ @@ -49,18 +40,10 @@ "TESState", # WES Models -<<<<<<< HEAD - "State", - "WESOutputs", - "Log", - "TaskLog", - "RunRequest", -======= "WESState", "WESOutputs", "WESLog", "WESTaskLog", "WESRunRequest", ->>>>>>> e2e7014 (feat: add WES models with unit tests) "WESData", ] diff --git a/crategen/models/wes_models.py b/crategen/models/wes_models.py index 15f3284..ed982fd 100644 --- a/crategen/models/wes_models.py +++ b/crategen/models/wes_models.py @@ -4,26 +4,14 @@ This module provides Pydantic models for the Workflow Execution Service (WES) schema, supporting validation, serialization, and deserialization of WES data structures. """ -<<<<<<< HEAD -from datetime import datetime -======= ->>>>>>> e2e7014 (feat: add WES models with unit tests) from enum import Enum from typing import List, Optional, Union from pydantic import BaseModel, Field, root_validator, validator -<<<<<<< HEAD - -from crategen.converters.utils import convert_to_iso8601 - - -class State(str, Enum): -======= from rfc3339_validator import validate_rfc3339 # type: ignore class WESState(str, Enum): ->>>>>>> e2e7014 (feat: add WES models with unit tests) """Enumeration of workflow states in the Workflow Execution Service (WES). These states represent the different stages a workflow can be in during its lifecycle. @@ -71,11 +59,7 @@ class WESOutputs(BaseModel): name: str -<<<<<<< HEAD -class Log(BaseModel): -======= class WESLog(BaseModel): ->>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a run log in the Workflow Execution Service (WES). @@ -83,13 +67,8 @@ class WESLog(BaseModel): - **name** (`Optional[str]`): The task or workflow name. - **cmd** (`Optional[list[str]]`): The command line that was executed. -<<<<<<< HEAD - - **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format. - - **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format. -======= - **start_time** (`Optional[str]`): When the command started executing, in RFC 3339 format. - **end_time** (`Optional[str]`): When the command stopped executing, in RFC 3339 format. ->>>>>>> e2e7014 (feat: add WES models with unit tests) - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - **exit_code** (`Optional[int]`): The exit code of the program. @@ -99,45 +78,14 @@ class WESLog(BaseModel): """ name: Optional[str] = None -<<<<<<< HEAD - start_time: Optional[datetime] = None - end_time: Optional[datetime] = None -======= start_time: Optional[str] = None end_time: Optional[str] = None ->>>>>>> e2e7014 (feat: add WES models with unit tests) cmd: Optional[List[str]] = None stdout: Optional[str] = None stderr: Optional[str] = None exit_code: Optional[int] = None system_logs: Optional[List[str]] = None -<<<<<<< HEAD - @validator("start_time", "end_time") - def validate_datetime(cls, value): - """Validate and convert datetime values to RFC3339/ISO8601 format. - - This validator handles both datetime objects and string representations, - converting them to a consistent ISO8601 format with UTC timezone (Z suffix). - - Args: - value (Union[datetime, str, None]): The datetime value to validate and convert - - Returns: - Optional[str]: The formatted datetime string, or None if input was None - """ - # Handle both string and datetime objects - if value is None: - return None - # If it's already a datetime object, convert it to ISO format - if isinstance(value, datetime): - return value.isoformat() + "Z" - # Otherwise, use the utility function for string conversion - return convert_to_iso8601(value) - - -class TaskLog(Log): -======= @validator("start_time", "end_time", allow_reuse=True) def validate_datetime(cls, value, field): """Check correct datetime format is RFC 3339""" @@ -149,7 +97,6 @@ def validate_datetime(cls, value, field): class WESTaskLog(WESLog): ->>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a task log in the Workflow Execution Service (WES). @@ -174,11 +121,7 @@ class WESTaskLog(WESLog): name: str = Field(...) -<<<<<<< HEAD -class RunRequest(BaseModel): -======= class WESRunRequest(BaseModel): ->>>>>>> e2e7014 (feat: add WES models with unit tests) """ Represents a workflow request in WES. @@ -237,21 +180,12 @@ class WESData(BaseModel): **Attributes:** - **run_id** (`str`): The unique identifier for the WES run. -<<<<<<< HEAD - - **request** (`Optional[RunRequest]`): The request associated with the WES run. - - **state** (`Optional[State]`): The state of the WES run. - - **run_log** (`Optional[Log]`): The log of the WES run. - - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used - to obtain a paginated list of task logs for this workflow. - - **task_logs** (`Optional[list[Log | TaskLog] | None]`): The logs of individual tasks within the run. -======= - **request** (`Optional[WESRunRequest]`): The request associated with the WES run. - **state** (`Optional[WESState]`): The state of the WES run. - **run_log** (`Optional[WESLog]`): The log of the WES run. - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used to obtain a paginated list of task logs for this workflow. - **task_logs** (`Optional[list[WESLog | WESTaskLog] | None]`): The logs of individual tasks within the run. ->>>>>>> e2e7014 (feat: add WES models with unit tests) This attribute is deprecated. - **outputs** (`dict[str, str]`): The outputs of the WES run. @@ -259,19 +193,11 @@ class WESData(BaseModel): """ run_id: str -<<<<<<< HEAD - request: Optional[RunRequest] = None - state: Optional[State] = None - run_log: Optional[Log] = None - task_logs_url: Optional[str] = None - task_logs: Optional[List[Union[Log, TaskLog]]] = None -======= request: Optional[WESRunRequest] = None state: Optional[WESState] = None run_log: Optional[WESLog] = None task_logs_url: Optional[str] = None task_logs: Optional[List[Union[WESLog, WESTaskLog]]] = None ->>>>>>> e2e7014 (feat: add WES models with unit tests) outputs: dict[str, str] = {} @root_validator diff --git a/tests/unit/test_wes_models.py b/tests/unit/test_wes_models.py index 806059f..d64a9fe 100644 --- a/tests/unit/test_wes_models.py +++ b/tests/unit/test_wes_models.py @@ -1,20 +1,4 @@ """Tests for WES models""" -<<<<<<< HEAD - -import datetime -import io -import sys -import pytest -from unittest.mock import patch - -from crategen.models.wes_models import ( - State, - WESOutputs, - Log, - TaskLog, - RunRequest, - WESData -======= import io import sys @@ -27,7 +11,6 @@ WESRunRequest, WESState, WESTaskLog, ->>>>>>> e2e7014 (feat: add WES models with unit tests) ) # Test data constants @@ -47,26 +30,6 @@ "02-10-2020T16:00:00.000Z", # Incorrect date order ] -<<<<<<< HEAD -test_workflow_url = "https://raw.githubusercontent.com/example/workflow.cwl" - -class TestState: - """Test suite for State enum""" - - def test_state_enum_values(self): - """Test that State enum has correct values""" - assert State.UNKNOWN == "UNKNOWN" - assert State.QUEUED == "QUEUED" - assert State.INITIALIZING == "INITIALIZING" - assert State.RUNNING == "RUNNING" - assert State.PAUSED == "PAUSED" - assert State.COMPLETE == "COMPLETE" - assert State.EXECUTOR_ERROR == "EXECUTOR_ERROR" - assert State.SYSTEM_ERROR == "SYSTEM_ERROR" - assert State.CANCELLED == "CANCELLED" - assert State.CANCELING == "CANCELING" - assert State.PREEMPTED == "PREEMPTED" -======= test_url = "https://raw.githubusercontent.com/elixir-cloud-aai/CrateGen/refs/heads/main/README.md" MAX_SYSTEM_LOGS = 2 @@ -108,31 +71,12 @@ def test_state_enum_values(self): assert WESState.CANCELING == "CANCELING" assert WESState.PREEMPTED == "PREEMPTED" ->>>>>>> e2e7014 (feat: add WES models with unit tests) class TestWESOutputs: """Test suite for WESOutputs model""" def test_wes_outputs_creation(self): """Test creating WESOutputs objects""" -<<<<<<< HEAD - # Test with minimal required fields - outputs = WESOutputs(location="/path/to/output", name="output_file") - assert outputs.location == "/path/to/output" - assert outputs.name == "output_file" - - # Test with edge case values - outputs = WESOutputs(location="s3://bucket/key", name="") - assert outputs.location == "s3://bucket/key" - assert outputs.name == "" - -class TestLog: - """Test suite for Log model""" - - def test_log_creation_with_minimal_fields(self): - """Test creating Log with minimal fields""" - log = Log() -======= # Test with real URLs outputs = WESOutputs( location="https://storage.googleapis.com/workflow-outputs/run123/result.bam", @@ -156,7 +100,6 @@ class TestWESLog: def test_log_creation_with_minimal_fields(self): """Test creating WESLog with minimal fields""" log = WESLog() ->>>>>>> e2e7014 (feat: add WES models with unit tests) assert log.name is None assert log.cmd is None assert log.start_time is None @@ -166,110 +109,6 @@ def test_log_creation_with_minimal_fields(self): assert log.exit_code is None assert log.system_logs is None -<<<<<<< HEAD - def test_log_creation_with_all_fields(self): - """Test creating Log with all fields""" - log = Log( - name="workflow_run", - cmd=["cwltool", "workflow.cwl", "inputs.json"], - start_time=datetime.datetime(2023, 1, 1, 12, 0, 0), - end_time=datetime.datetime(2023, 1, 1, 12, 30, 0), - stdout="https://example.com/stdout.log", - stderr="https://example.com/stderr.log", - exit_code=0, - system_logs=["Starting job", "Job completed"] - ) - assert log.name == "workflow_run" - assert log.cmd == ["cwltool", "workflow.cwl", "inputs.json"] - assert log.stdout == "https://example.com/stdout.log" - assert log.stderr == "https://example.com/stderr.log" - assert log.exit_code == 0 - assert log.system_logs == ["Starting job", "Job completed"] - # Datetime fields are validated and converted to RFC3339 - -class TestTaskLog: - """Test suite for TaskLog model""" - - def test_task_log_creation(self): - """Test creating TaskLog objects""" - # Test with required fields - task_log = TaskLog( - id="task-001", - name="alignment_task", - ) - assert task_log.id == "task-001" - assert task_log.name == "alignment_task" - assert task_log.tes_uri is None - - # Test with all fields - task_log = TaskLog( - id="task-002", - name="variant_calling", - tes_uri="https://tes-service.org/tasks/task-002", - cmd=["samtools", "mpileup", "-f", "ref.fa", "input.bam"], - exit_code=0 - ) - assert task_log.id == "task-002" - assert task_log.name == "variant_calling" - assert task_log.tes_uri == "https://tes-service.org/tasks/task-002" - assert task_log.cmd == ["samtools", "mpileup", "-f", "ref.fa", "input.bam"] - assert task_log.exit_code == 0 - - def test_task_log_name_required(self): - """Test that name is required for TaskLog""" - with pytest.raises(ValueError): - TaskLog(id="task-001") - -class TestRunRequest: - """Test suite for RunRequest model""" - - def test_run_request_minimal(self): - """Test creating RunRequest with minimal fields""" - request = RunRequest( - workflow_params={"input": "input.txt", "output": "output.txt"}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url="https://example.com/workflow.cwl" - ) - assert request.workflow_params == {"input": "input.txt", "output": "output.txt"} - assert request.workflow_type == "CWL" - assert request.workflow_type_version == "v1.0" - assert request.workflow_url == "https://example.com/workflow.cwl" - assert request.tags == {} - assert request.workflow_engine_parameters is None - assert request.workflow_engine is None - assert request.workflow_engine_version is None - - def test_run_request_all_fields(self): - """Test creating RunRequest with all fields""" - request = RunRequest( - workflow_params={"input": "input.txt", "output": "output.txt"}, - workflow_type="WDL", - workflow_type_version="1.0", - workflow_url="https://example.com/workflow.wdl", - tags={"project": "genomics", "priority": "high"}, - workflow_engine_parameters={"memory": "16G"}, - workflow_engine="cromwell", - workflow_engine_version="52.0.1" - ) - assert request.workflow_params == {"input": "input.txt", "output": "output.txt"} - assert request.workflow_type == "WDL" - assert request.workflow_type_version == "1.0" - assert request.workflow_url == "https://example.com/workflow.wdl" - assert request.tags == {"project": "genomics", "priority": "high"} - assert request.workflow_engine_parameters == {"memory": "16G"} - assert request.workflow_engine == "cromwell" - assert request.workflow_engine_version == "52.0.1" - - def test_workflow_engine_validation(self): - """Test that workflow_engine is required when workflow_engine_version is set""" - # Valid: No version specified - request = RunRequest( - workflow_params={}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url="https://example.com/workflow.cwl" -======= @pytest.mark.parametrize("valid_datetime", valid_datetime_strings) def test_log_creation_with_all_fields(self, valid_datetime): """Test creating WESLog with all fields""" @@ -388,40 +227,20 @@ def test_workflow_engine_validation(self): workflow_type="CWL", workflow_type_version="v1.0", workflow_url=test_url ->>>>>>> e2e7014 (feat: add WES models with unit tests) ) assert request.workflow_engine is None assert request.workflow_engine_version is None # Valid: Both engine and version specified -<<<<<<< HEAD - request = RunRequest( - workflow_params={}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url="https://example.com/workflow.cwl", -======= request = WESRunRequest( workflow_params={}, workflow_type="CWL", workflow_type_version="v1.0", workflow_url=test_url, ->>>>>>> e2e7014 (feat: add WES models with unit tests) workflow_engine="cwltool", workflow_engine_version="3.1.20220502201013" ) assert request.workflow_engine == "cwltool" -<<<<<<< HEAD - assert request.workflow_engine_version == "3.1.20220502201013" - - # Invalid: Version without engine - with pytest.raises(ValueError) as exc_info: - RunRequest( - workflow_params={}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url="https://example.com/workflow.cwl", -======= assert request.workflow_engine_version.startswith("3.1") # Invalid: Version without engine @@ -431,27 +250,17 @@ def test_workflow_engine_validation(self): workflow_type="CWL", workflow_type_version="v1.0", workflow_url=test_url, ->>>>>>> e2e7014 (feat: add WES models with unit tests) workflow_engine_version="3.1.20220502201013" ) assert "workflow_engine" in str(exc_info.value) -<<<<<<< HEAD -======= - ->>>>>>> e2e7014 (feat: add WES models with unit tests) class TestWESData: """Test suite for WESData model""" def test_wes_data_minimal(self): """Test creating WESData with minimal fields""" -<<<<<<< HEAD - wes_data = WESData(run_id="run-001") - assert wes_data.run_id == "run-001" -======= wes_data = WESData(run_id="wes-run-123") assert wes_data.run_id.startswith("wes-") ->>>>>>> e2e7014 (feat: add WES models with unit tests) assert wes_data.request is None assert wes_data.state is None assert wes_data.run_log is None @@ -461,45 +270,6 @@ def test_wes_data_minimal(self): def test_wes_data_complete(self): """Test creating WESData with all fields""" -<<<<<<< HEAD - request = RunRequest( - workflow_params={"input": "input.txt"}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url="https://example.com/workflow.cwl" - ) - run_log = Log( - name="workflow_run", - cmd=["cwltool", "workflow.cwl", "inputs.json"], - exit_code=0 - ) - task_log = TaskLog( - id="task-001", - name="data_processing" - ) - - wes_data = WESData( - run_id="run-001", - request=request, - state=State.COMPLETE, - run_log=run_log, - task_logs_url="https://example.com/tasks", - task_logs=[task_log], - outputs={"result": "https://example.com/results/run-001.txt"} - ) - - assert wes_data.run_id == "run-001" - assert wes_data.request == request - assert wes_data.state == State.COMPLETE - assert wes_data.run_log == run_log - assert wes_data.task_logs_url == "https://example.com/tasks" - assert wes_data.task_logs == [task_log] - assert wes_data.outputs == {"result": "https://example.com/results/run-001.txt"} - - def test_task_logs_deprecation_warning(self): - """Test deprecation warning when task_logs is used""" - task_log = TaskLog(id="task-001", name="test") -======= request = WESRunRequest( workflow_params={ "reads": "https://storage.googleapis.com/sample-data/sample.fastq", @@ -560,18 +330,13 @@ def test_task_logs_deprecation_warning(self): stderr="https://storage.googleapis.com/logs/task123/stderr.log", exit_code=0 ) ->>>>>>> e2e7014 (feat: add WES models with unit tests) # Capture stdout to test the deprecation warning captured_output = io.StringIO() sys.stdout = captured_output WESData( -<<<<<<< HEAD - run_id="run-001", -======= run_id="wes-run-123", ->>>>>>> e2e7014 (feat: add WES models with unit tests) task_logs=[task_log] ) @@ -580,8 +345,4 @@ def test_task_logs_deprecation_warning(self): assert "DeprecationWarning" in output assert "task_logs" in output -<<<<<<< HEAD - assert "task_logs_url" in output # Fixed: was 'tes_logs_url', should be 'task_logs_url' -======= assert "task_logs_url" in output ->>>>>>> e2e7014 (feat: add WES models with unit tests) From b1ff782446cd486a521204ece4bf52d1ef3eb03a Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Tue, 18 Mar 2025 11:33:02 +0530 Subject: [PATCH 4/6] feat: add WES models with unit tests --- crategen/models/wes_models.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/crategen/models/wes_models.py b/crategen/models/wes_models.py index ed982fd..fc5d4d6 100644 --- a/crategen/models/wes_models.py +++ b/crategen/models/wes_models.py @@ -10,6 +10,9 @@ from pydantic import BaseModel, Field, root_validator, validator from rfc3339_validator import validate_rfc3339 # type: ignore +# Define the maximum number of system logs allowed +MAX_SYSTEM_LOGS = 2 + class WESState(str, Enum): """Enumeration of workflow states in the Workflow Execution Service (WES). @@ -72,7 +75,7 @@ class WESLog(BaseModel): - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - **exit_code** (`Optional[int]`): The exit code of the program. - - **system_logs** (`optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. + - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model """ @@ -95,6 +98,13 @@ def validate_datetime(cls, value, field): ) return value + @validator("system_logs", pre=True, always=True) + def truncate_system_logs(cls, value): + """Truncate the system_logs list to at most MAX_SYSTEM_LOGS entries.""" + if value is None: + return value + return value[:MAX_SYSTEM_LOGS] if len(value) > MAX_SYSTEM_LOGS else value + class WESTaskLog(WESLog): """ @@ -109,7 +119,7 @@ class WESTaskLog(WESLog): - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - **exit_code** (`Optional[int]`): The exit code of the program. - - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. + - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. - **id** (`str`): A unique identifier which maybe used to reference the task. - **tes_uri** (`Optional[str]`): An optional URL pointing to an extended task definition defined by a TES api. From 35effda88928030887d5169e4bfc860761b092c2 Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Fri, 21 Mar 2025 19:43:39 +0530 Subject: [PATCH 5/6] feat: add WES models with unit tests --- tests/unit/test_wes_models.py | 56 +++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_wes_models.py b/tests/unit/test_wes_models.py index d64a9fe..1b116ff 100644 --- a/tests/unit/test_wes_models.py +++ b/tests/unit/test_wes_models.py @@ -133,6 +133,21 @@ def test_log_creation_with_all_fields(self, valid_datetime): assert log.exit_code == 0 assert len(log.system_logs) == MAX_SYSTEM_LOGS + def test_system_logs_truncation(self): + """Test that system_logs are truncated when exceeding MAX_SYSTEM_LOGS""" + excess_logs = [f"Log entry {i}" for i in range(MAX_SYSTEM_LOGS + 3)] # Create more logs than MAX_SYSTEM_LOGS + log = WESLog( + name="workflow_456", + system_logs=excess_logs + ) + + # Verify truncation + assert len(log.system_logs) == MAX_SYSTEM_LOGS + assert log.system_logs == excess_logs[:MAX_SYSTEM_LOGS] # Should keep only first MAX_SYSTEM_LOGS entries + assert "Log entry 0" in log.system_logs # First entry should be present + assert "Log entry 1" in log.system_logs # Second entry should be present + assert f"Log entry {MAX_SYSTEM_LOGS + 2}" not in log.system_logs # Last excess entry should be truncated + @pytest.mark.parametrize("invalid_datetime", invalid_datetime_strings) def test_log_datetime_validation(self, invalid_datetime): """Test datetime validation in WESLog""" @@ -308,7 +323,10 @@ def test_wes_data_complete(self): task_logs=[task_log], outputs={ "aligned_bam": "https://storage.googleapis.com/workflow-outputs/run123/aligned.bam", - "variants_vcf": "https://storage.googleapis.com/workflow-outputs/run123/variants.vcf" + "variants_vcf": "s3://my-bucket/outputs/variants.vcf", + "local_log": "file:///tmp/workflow123/run.log", + "metrics": "/absolute/path/to/metrics.txt", + "relative_output": "./results/summary.txt" } ) @@ -318,7 +336,41 @@ def test_wes_data_complete(self): assert wes_data.run_log == run_log assert wes_data.task_logs_url.startswith("https://") assert wes_data.task_logs == [task_log] - assert all(url.startswith("https://") for url in wes_data.outputs.values()) + + # Test different URL schemes in outputs + assert wes_data.outputs["aligned_bam"].startswith("https://") + assert wes_data.outputs["variants_vcf"].startswith("s3://") + assert wes_data.outputs["local_log"].startswith("file://") + assert wes_data.outputs["metrics"].startswith("/") + assert wes_data.outputs["relative_output"].startswith(".") + + def test_outputs_with_different_url_schemes(self): + """Test that WESData accepts outputs with different URL schemes and path types""" + output_urls = { + "http_url": "http://example.com/output.txt", + "https_url": "https://storage.googleapis.com/output.txt", + "s3_url": "s3://my-bucket/output.txt", + "gs_url": "gs://my-bucket/output.txt", + "file_url": "file:///local/path/output.txt", + "absolute_path": "/absolute/path/output.txt", + "relative_path": "./relative/path/output.txt" + } + + wes_data = WESData(run_id="wes-run-123", outputs=output_urls) + + # Verify all output URLs are preserved + assert wes_data.outputs["http_url"].startswith("http://") + assert wes_data.outputs["https_url"].startswith("https://") + assert wes_data.outputs["s3_url"].startswith("s3://") + assert wes_data.outputs["gs_url"].startswith("gs://") + assert wes_data.outputs["file_url"].startswith("file://") + assert wes_data.outputs["absolute_path"].startswith("/") + assert wes_data.outputs["relative_path"].startswith(".") + + # Verify we can retrieve all outputs + assert len(wes_data.outputs) == len(output_urls) + assert all(key in wes_data.outputs for key in output_urls) + assert all(wes_data.outputs[key] == value for key, value in output_urls.items()) def test_task_logs_deprecation_warning(self): """Test deprecation warning when task_logs is used""" From 85da875487c267d0cdf9e9cddc77faf95bf61d79 Mon Sep 17 00:00:00 2001 From: Karanjot786 Date: Wed, 26 Mar 2025 22:26:45 +0530 Subject: [PATCH 6/6] refactor: align WES models with GA4GH specification --- crategen/models/__init__.py | 22 +- crategen/models/wes_models.py | 208 ++++++------------ tests/unit/test_wes_models.py | 392 +++++++++------------------------- 3 files changed, 180 insertions(+), 442 deletions(-) diff --git a/crategen/models/__init__.py b/crategen/models/__init__.py index d39758f..0dcf03e 100644 --- a/crategen/models/__init__.py +++ b/crategen/models/__init__.py @@ -18,12 +18,11 @@ TESTaskLog, ) from .wes_models import ( - WESData, - WESLog, - WESOutputs, - WESRunRequest, - WESState, - WESTaskLog, + Log, + Run, + RunRequest, + State, + TaskLog, ) __all__ = [ @@ -40,10 +39,9 @@ "TESState", # WES Models - "WESState", - "WESOutputs", - "WESLog", - "WESTaskLog", - "WESRunRequest", - "WESData", + "State", + "Log", + "TaskLog", + "RunRequest", + "Run", ] diff --git a/crategen/models/wes_models.py b/crategen/models/wes_models.py index fc5d4d6..01c5f63 100644 --- a/crategen/models/wes_models.py +++ b/crategen/models/wes_models.py @@ -1,39 +1,27 @@ -""" -Each model in this module conforms to the corresponding WES model names as specified by the GA4GH schema (https://ga4gh.github.io/workflow-execution-service-schemas/docs/). +"""Each model in this module conforms to the corresponding WES model names as specified by the GA4GH schema (https://ga4gh.github.io/workflow-execution-service-schemas/docs/).""" -This module provides Pydantic models for the Workflow Execution Service (WES) schema, -supporting validation, serialization, and deserialization of WES data structures. -""" from enum import Enum from typing import List, Optional, Union from pydantic import BaseModel, Field, root_validator, validator from rfc3339_validator import validate_rfc3339 # type: ignore -# Define the maximum number of system logs allowed -MAX_SYSTEM_LOGS = 2 - - -class WESState(str, Enum): - """Enumeration of workflow states in the Workflow Execution Service (WES). - - These states represent the different stages a workflow can be in during its lifecycle. - - **Attributes:** - - - **UNKNOWN**: The state of the workflow is unknown. - - **QUEUED**: The workflow is queued. - - **INITIALIZING**: The workflow is initializing. - - **RUNNING**: The workflow is running. - - **PAUSED**: The workflow is paused. - - **COMPLETE**: The workflow has completed successfully. - - **EXECUTOR_ERROR**: The workflow encountered an executor error. - - **SYSTEM_ERROR**: The workflow encountered a system error. - - **CANCELLED**: The workflow was cancelled by the user. - - **CANCELING**: The workflow is in the process of being cancelled. - - **PREEMPTED**: The workflow was preempted by the system. - - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/state_enum + +class State(str, Enum): + """Enumeration of workflow states. + + Attributes: + UNKNOWN: The state of the workflow is unknown. This provides a safe default for messages where this field is missing. + QUEUED: The workflow is queued. + INITIALIZING: The workflow is initializing. + RUNNING: The workflow is running. + PAUSED: The workflow is paused. + COMPLETE: The workflow has completed successfully. + EXECUTOR_ERROR: The workflow encountered an executor error. + SYSTEM_ERROR: The workflow encountered a system error. + CANCELED: The workflow was canceled by the user. + CANCELING: The workflow was canceled by the user, and is in the process of stopping. + PREEMPTED: The workflow is stopped (preempted) by the system. """ UNKNOWN = "UNKNOWN" QUEUED = "QUEUED" @@ -43,47 +31,29 @@ class WESState(str, Enum): COMPLETE = "COMPLETE" EXECUTOR_ERROR = "EXECUTOR_ERROR" SYSTEM_ERROR = "SYSTEM_ERROR" - CANCELLED = "CANCELLED" + CANCELED = "CANCELED" CANCELING = "CANCELING" PREEMPTED = "PREEMPTED" -class WESOutputs(BaseModel): - """Represents an output file or directory from a workflow run. - - **Attributes:** - - - **location** (`str`): The location (URL or path) where the output is stored. - - **name** (`str`): The name of the output file or directory. - - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/outputs_model - """ - location: str - name: str - - -class WESLog(BaseModel): - """ - Represents a run log in the Workflow Execution Service (WES). - - **Attributes:** +class Log(BaseModel): + """Log information for a workflow run or task. - - **name** (`Optional[str]`): The task or workflow name. - - **cmd** (`Optional[list[str]]`): The command line that was executed. - - **start_time** (`Optional[str]`): When the command started executing, in RFC 3339 format. - - **end_time** (`Optional[str]`): When the command stopped executing, in RFC 3339 format. - - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - - **exit_code** (`Optional[int]`): The exit code of the program. - - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. - - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model + Attributes: + name (`Optional[str]`): Task or workflow name + cmd (`Optional[List[str]]`): Command line executed + start_time (`Optional[str]`): When the task started executing (RFC 3339) + end_time (`Optional[str]`): When the task ended (RFC 3339) + stdout (`Optional[str]`): URL to retrieve standard output logs + stderr (`Optional[str]`): URL to retrieve standard error logs + exit_code (`Optional[int]`): Exit code of the program + system_logs (`Optional[List[str]]`): Any logs the system decides are relevant """ name: Optional[str] = None + cmd: Optional[List[str]] = None start_time: Optional[str] = None end_time: Optional[str] = None - cmd: Optional[List[str]] = None stdout: Optional[str] = None stderr: Optional[str] = None exit_code: Optional[int] = None @@ -94,36 +64,18 @@ def validate_datetime(cls, value, field): """Check correct datetime format is RFC 3339""" if value and not validate_rfc3339(value): raise ValueError( - f"The '{field.name}' property must be in the RFC 3339 format" + f"The '{field.name}' property must be in RFC 3339 format" ) return value - @validator("system_logs", pre=True, always=True) - def truncate_system_logs(cls, value): - """Truncate the system_logs list to at most MAX_SYSTEM_LOGS entries.""" - if value is None: - return value - return value[:MAX_SYSTEM_LOGS] if len(value) > MAX_SYSTEM_LOGS else value +class TaskLog(Log): + """Task execution log information. -class WESTaskLog(WESLog): - """ - Represents a task log in the Workflow Execution Service (WES). - - **Attributes:** - - - **name** (`str`): The task or workflow name. - - **cmd** (`Optional[list[str]]`): The command line that was executed. - - **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format. - - **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format. - - **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task. - - **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task. - - **exit_code** (`Optional[int]`): The exit code of the program. - - **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow. - - **id** (`str`): A unique identifier which maybe used to reference the task. - - **tes_uri** (`Optional[str]`): An optional URL pointing to an extended task definition defined by a TES api. - - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model + Attributes: + id (`str`): Unique identifier which may be used to reference the task + tes_uri (`Optional[str]`): Optional URL pointing to an extended task definition defined by a TES API + name (`str`): REQUIRED The name of the task """ id: str @@ -131,23 +83,18 @@ class WESTaskLog(WESLog): name: str = Field(...) -class WESRunRequest(BaseModel): - """ - Represents a workflow request in WES. - - **Attributes:** - - - **workflow_params** (`Optional[dict[str, str]]`): The workflow run parameterizations (JSON encoded), - including input and output file locations. - - **workflow_type** (`str`): The workflow descriptor type. - - **workflow_type_version** (`str`): The workflow descriptor type version. - - **tags** (`Optional[dict[str, str]]`): Additional tags associated with the workflow. - - **workflow_engine_parameters** (Optional[dict[str, str]]): Input values specific to the workflow engine. - - **workflow_engine** (`Optional[str]`): The workflow engine. - - **workflow_engine_version** (`Optional[str]`): The workflow engine version. - - **workflow_url** (`str`): The workflow url. +class RunRequest(BaseModel): + """A workflow run request. - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runrequest_model + Attributes: + workflow_params (`dict[str, str]`): REQUIRED The workflow run parameterizations (JSON encoded) + workflow_type (`str`): REQUIRED The workflow descriptor type (e.g., "CWL" or "WDL") + workflow_type_version (`str`): REQUIRED The workflow descriptor type version + tags (`Optional[dict[str, str]]`): Arbitrary key/value tags for the workflow + workflow_engine_parameters (`Optional[dict[str, str]]`): Workflow engine specific parameters + workflow_engine (`Optional[str]`): The workflow engine that should run this workflow + workflow_engine_version (`Optional[str]`): The version of the workflow engine + workflow_url (`str`): The workflow CWL or WDL document """ workflow_params: dict[str, str] @@ -161,19 +108,7 @@ class WESRunRequest(BaseModel): @root_validator() def validate_workflow_engine(cls, values): - """Validate workflow engine dependencies. - - If workflow_engine_version is set, then workflow_engine must also be set. - - Args: - values (dict): The model field values - - Returns: - dict: The validated field values - - Raises: - ValueError: If workflow_engine_version is set but workflow_engine is not - """ + """Validate workflow engine dependencies.""" engine_version = values.get("workflow_engine_version") engine = values.get("workflow_engine") if engine_version is not None and engine is None: @@ -183,45 +118,30 @@ def validate_workflow_engine(cls, values): return values -class WESData(BaseModel): - """ - Represents a WES run. - - **Attributes:** - - - **run_id** (`str`): The unique identifier for the WES run. - - **request** (`Optional[WESRunRequest]`): The request associated with the WES run. - - **state** (`Optional[WESState]`): The state of the WES run. - - **run_log** (`Optional[WESLog]`): The log of the WES run. - - **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used - to obtain a paginated list of task logs for this workflow. - - **task_logs** (`Optional[list[WESLog | WESTaskLog] | None]`): The logs of individual tasks within the run. - This attribute is deprecated. - - **outputs** (`dict[str, str]`): The outputs of the WES run. +class Run(BaseModel): + """A workflow run. - **Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/run_model + Attributes: + run_id (`str`): Workflow run ID + request (`Optional[RunRequest]`): The original workflow run request + state (`Optional[State]`): Current state of the workflow run + run_log (`Optional[Log]`): Log information about the workflow run + task_logs_url (`Optional[str]`): URL for obtaining task logs + task_logs (`Optional[List[Union[Log, TaskLog]]]`): DEPRECATED Task logs, use task_logs_url instead + outputs (`dict[str, str]`): Output files produced by the workflow run """ run_id: str - request: Optional[WESRunRequest] = None - state: Optional[WESState] = None - run_log: Optional[WESLog] = None + request: Optional[RunRequest] = None + state: Optional[State] = None + run_log: Optional[Log] = None task_logs_url: Optional[str] = None - task_logs: Optional[List[Union[WESLog, WESTaskLog]]] = None + task_logs: Optional[List[Union[Log, TaskLog]]] = None outputs: dict[str, str] = {} @root_validator def check_deprecated_fields(cls, values): - """Check for usage of deprecated fields and issue warnings. - - Currently warns about task_logs field which is deprecated in favor of task_logs_url. - - Args: - values (dict): The model field values - - Returns: - dict: The field values unchanged - """ + """Check for usage of deprecated fields.""" if values.get("task_logs") is not None: print( "DeprecationWarning: The 'task_logs' field is deprecated and will be removed in future versions. Use 'task_logs_url' instead." diff --git a/tests/unit/test_wes_models.py b/tests/unit/test_wes_models.py index 1b116ff..206cf54 100644 --- a/tests/unit/test_wes_models.py +++ b/tests/unit/test_wes_models.py @@ -5,12 +5,11 @@ import pytest from crategen.models.wes_models import ( - WESData, - WESLog, - WESOutputs, - WESRunRequest, - WESState, - WESTaskLog, + Log, + Run, + RunRequest, + State, + TaskLog, ) # Test data constants @@ -32,8 +31,6 @@ test_url = "https://raw.githubusercontent.com/elixir-cloud-aai/CrateGen/refs/heads/main/README.md" -MAX_SYSTEM_LOGS = 2 - # Test data samples test_workflow_params = { "reads": f"{test_url}/reads.fastq", @@ -54,118 +51,66 @@ } -class TestWESState: - """Test suite for WESState enum""" +class TestState: + """Test suite for State enum""" def test_state_enum_values(self): - """Test that WESState enum has correct values""" - assert WESState.UNKNOWN == "UNKNOWN" - assert WESState.QUEUED == "QUEUED" - assert WESState.INITIALIZING == "INITIALIZING" - assert WESState.RUNNING == "RUNNING" - assert WESState.PAUSED == "PAUSED" - assert WESState.COMPLETE == "COMPLETE" - assert WESState.EXECUTOR_ERROR == "EXECUTOR_ERROR" - assert WESState.SYSTEM_ERROR == "SYSTEM_ERROR" - assert WESState.CANCELLED == "CANCELLED" - assert WESState.CANCELING == "CANCELING" - assert WESState.PREEMPTED == "PREEMPTED" - - -class TestWESOutputs: - """Test suite for WESOutputs model""" - - def test_wes_outputs_creation(self): - """Test creating WESOutputs objects""" - # Test with real URLs - outputs = WESOutputs( - location="https://storage.googleapis.com/workflow-outputs/run123/result.bam", - name="alignment.bam" - ) - assert outputs.location.startswith("https://") - assert outputs.name == "alignment.bam" - - # Test with different output types - outputs = WESOutputs( - location="https://storage.googleapis.com/workflow-outputs/run123/variants.vcf", - name="variants.vcf" - ) - assert "variants.vcf" in outputs.location - assert outputs.name.endswith(".vcf") + """Test that State enum has correct values from GA4GH spec""" + assert State.UNKNOWN == "UNKNOWN" + assert State.QUEUED == "QUEUED" + assert State.INITIALIZING == "INITIALIZING" + assert State.RUNNING == "RUNNING" + assert State.PAUSED == "PAUSED" + assert State.COMPLETE == "COMPLETE" + assert State.EXECUTOR_ERROR == "EXECUTOR_ERROR" + assert State.SYSTEM_ERROR == "SYSTEM_ERROR" + assert State.CANCELED == "CANCELED" + assert State.CANCELING == "CANCELING" + assert State.PREEMPTED == "PREEMPTED" + + +class TestLog: + """Test suite for Log model""" + + def test_log_datetime_validation(self): + """Test datetime validation in Log""" + for valid_datetime in valid_datetime_strings: + log = Log( + name="workflow_123", + start_time=valid_datetime, + end_time=valid_datetime + ) + assert log.start_time == valid_datetime + assert log.end_time == valid_datetime + for invalid_datetime in invalid_datetime_strings: + with pytest.raises(ValueError) as exc_info: + Log( + name="workflow_123", + start_time=invalid_datetime + ) + assert "format" in str(exc_info.value) -class TestWESLog: - """Test suite for WESLog model""" - def test_log_creation_with_minimal_fields(self): - """Test creating WESLog with minimal fields""" - log = WESLog() - assert log.name is None - assert log.cmd is None - assert log.start_time is None - assert log.end_time is None - assert log.stdout is None - assert log.stderr is None - assert log.exit_code is None - assert log.system_logs is None +class TestTaskLog: + """Test suite for TaskLog model""" - @pytest.mark.parametrize("valid_datetime", valid_datetime_strings) - def test_log_creation_with_all_fields(self, valid_datetime): - """Test creating WESLog with all fields""" - log = WESLog( - name="workflow_123", - cmd=["cwltool", "--non-strict", test_url, test_url], - start_time=valid_datetime, - end_time=valid_datetime, - stdout="https://storage.googleapis.com/workflow-logs/run123/stdout.log", - stderr="https://storage.googleapis.com/workflow-logs/run123/stderr.log", - exit_code=0, - system_logs=[ - "Workflow engine initialized", - "Downloading CWL workflow from GitHub", - "Execution completed successfully" - ] - ) - assert log.name == "workflow_123" - assert test_url in log.cmd - assert log.stdout.startswith("https://") - assert log.stderr.startswith("https://") - assert log.exit_code == 0 - assert len(log.system_logs) == MAX_SYSTEM_LOGS + def test_task_log_required_fields(self): + """Test that required fields must be provided""" + with pytest.raises(ValueError): + TaskLog(id="task-123") # Missing required name field - def test_system_logs_truncation(self): - """Test that system_logs are truncated when exceeding MAX_SYSTEM_LOGS""" - excess_logs = [f"Log entry {i}" for i in range(MAX_SYSTEM_LOGS + 3)] # Create more logs than MAX_SYSTEM_LOGS - log = WESLog( - name="workflow_456", - system_logs=excess_logs + # Test with required fields + task_log = TaskLog( + id="task-123", + name="alignment" ) - - # Verify truncation - assert len(log.system_logs) == MAX_SYSTEM_LOGS - assert log.system_logs == excess_logs[:MAX_SYSTEM_LOGS] # Should keep only first MAX_SYSTEM_LOGS entries - assert "Log entry 0" in log.system_logs # First entry should be present - assert "Log entry 1" in log.system_logs # Second entry should be present - assert f"Log entry {MAX_SYSTEM_LOGS + 2}" not in log.system_logs # Last excess entry should be truncated - - @pytest.mark.parametrize("invalid_datetime", invalid_datetime_strings) - def test_log_datetime_validation(self, invalid_datetime): - """Test datetime validation in WESLog""" - with pytest.raises(ValueError) as exc_info: - WESLog( - name="workflow_123", - start_time=invalid_datetime - ) - assert "format" in str(exc_info.value) - + assert task_log.id == "task-123" + assert task_log.name == "alignment" -class TestWESTaskLog: - """Test suite for WESTaskLog model""" - - def test_task_log_creation(self): - """Test creating WESTaskLog objects""" - # Test with required fields and real URLs - task_log = WESTaskLog( + def test_task_log_all_fields(self): + """Test TaskLog with all fields""" + task_log = TaskLog( id="task-bwa-mem-123", name="bwa_mem_alignment", cmd=["bwa", "mem", "-t", "4", "reference.fa", "reads.fastq"], @@ -180,172 +125,84 @@ def test_task_log_creation(self): assert task_log.stderr.startswith("https://") assert task_log.tes_uri == test_url - def test_task_log_name_required(self): - """Test that name is required for WESTaskLog""" - with pytest.raises(ValueError): - WESTaskLog(id="task-123") +class TestRunRequest: + """Test suite for RunRequest model""" -class TestWESRunRequest: - """Test suite for WESRunRequest model""" + def test_run_request_required_fields(self): + """Test that required fields must be provided""" + with pytest.raises(ValueError): + RunRequest(workflow_type="CWL") # Missing other required fields - def test_run_request_minimal(self): - """Test creating WESRunRequest with minimal fields""" - request = WESRunRequest( - workflow_params={ - "input_reads": "https://storage.googleapis.com/sample-data/reads.fastq", - "output_path": "s3://my-bucket/results/" - }, + request = RunRequest( + workflow_params={"input": "test.txt"}, workflow_type="CWL", workflow_type_version="v1.0", workflow_url=test_url ) assert request.workflow_type == "CWL" assert request.workflow_url == test_url - assert request.workflow_params["input_reads"].startswith("https://") - assert request.workflow_params["output_path"].startswith("s3://") - - def test_run_request_all_fields(self): - """Test creating WESRunRequest with all fields""" - request = WESRunRequest( - workflow_params={ - "reads": "https://storage.googleapis.com/sample-data/tumor.bam", - "reference": "https://storage.googleapis.com/references/hg38.fa", - "output_dir": "s3://my-bucket/variant-calls/" - }, - workflow_type="WDL", - workflow_type_version="1.0", - workflow_url="https://raw.githubusercontent.com/broadinstitute/gatk-workflows/master/mutect2.wdl", - tags={ - "project": "cancer-genomics", - "sample": "TCGA-AB-2823", - "analysis": "somatic-variant-calling" - }, - workflow_engine_parameters={ - "memory": "16GB", - "cpu": "4", - "disk_size": "100GB" - }, - workflow_engine="cromwell", - workflow_engine_version="52.0.1" - ) - assert request.workflow_type == "WDL" - assert request.workflow_url.endswith(".wdl") - assert request.tags["project"] == "cancer-genomics" - assert request.workflow_engine == "cromwell" def test_workflow_engine_validation(self): """Test workflow engine validation rules""" - # Valid: No version specified - request = WESRunRequest( - workflow_params={}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url=test_url - ) - assert request.workflow_engine is None - assert request.workflow_engine_version is None + # Version without engine should fail + with pytest.raises(ValueError) as exc_info: + RunRequest( + workflow_params={}, + workflow_type="CWL", + workflow_type_version="v1.0", + workflow_url=test_url, + workflow_engine_version="3.1.0" + ) + assert "workflow_engine" in str(exc_info.value) - # Valid: Both engine and version specified - request = WESRunRequest( + # Both engine and version should work + request = RunRequest( workflow_params={}, workflow_type="CWL", workflow_type_version="v1.0", workflow_url=test_url, workflow_engine="cwltool", - workflow_engine_version="3.1.20220502201013" + workflow_engine_version="3.1.0" ) assert request.workflow_engine == "cwltool" - assert request.workflow_engine_version.startswith("3.1") + assert request.workflow_engine_version == "3.1.0" - # Invalid: Version without engine - with pytest.raises(ValueError) as exc_info: - WESRunRequest( - workflow_params={}, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url=test_url, - workflow_engine_version="3.1.20220502201013" - ) - assert "workflow_engine" in str(exc_info.value) -class TestWESData: - """Test suite for WESData model""" +class TestRun: + """Test suite for Run model""" - def test_wes_data_minimal(self): - """Test creating WESData with minimal fields""" - wes_data = WESData(run_id="wes-run-123") - assert wes_data.run_id.startswith("wes-") - assert wes_data.request is None - assert wes_data.state is None - assert wes_data.run_log is None - assert wes_data.task_logs_url is None - assert wes_data.task_logs is None - assert wes_data.outputs == {} + def test_run_required_fields(self): + """Test that required fields must be provided""" + run = Run(run_id="run-123") + assert run.run_id == "run-123" + assert run.outputs == {} - def test_wes_data_complete(self): - """Test creating WESData with all fields""" - request = WESRunRequest( - workflow_params={ - "reads": "https://storage.googleapis.com/sample-data/sample.fastq", - "output_dir": "s3://my-bucket/results/" - }, - workflow_type="CWL", - workflow_type_version="v1.0", - workflow_url=test_url - ) - - run_log = WESLog( - name="variant_calling_workflow", - cmd=["cwltool", "--non-strict", test_url, test_url], - stdout="https://storage.googleapis.com/workflow-logs/run123/stdout.log", - stderr="https://storage.googleapis.com/workflow-logs/run123/stderr.log", - exit_code=0, - system_logs=["Workflow started", "Workflow completed"] - ) - - task_log = WESTaskLog( + def test_task_logs_deprecation(self): + """Test deprecation warning for task_logs field""" + task_log = TaskLog( id="task-123", - name="variant_calling", - cmd=["gatk", "HaplotypeCaller", "-R", "reference.fa", "-I", "input.bam"], - stdout="https://storage.googleapis.com/workflow-logs/task123/stdout.log", - stderr="https://storage.googleapis.com/workflow-logs/task123/stderr.log", - exit_code=0, - tes_uri=test_url + name="alignment" ) - wes_data = WESData( - run_id="wes-run-123", - request=request, - state=WESState.COMPLETE, - run_log=run_log, - task_logs_url="https://storage.googleapis.com/workflow-logs/run123/tasks/", - task_logs=[task_log], - outputs={ - "aligned_bam": "https://storage.googleapis.com/workflow-outputs/run123/aligned.bam", - "variants_vcf": "s3://my-bucket/outputs/variants.vcf", - "local_log": "file:///tmp/workflow123/run.log", - "metrics": "/absolute/path/to/metrics.txt", - "relative_output": "./results/summary.txt" - } + # Capture stdout to test deprecation warning + captured_output = io.StringIO() + sys.stdout = captured_output + + Run( + run_id="run-123", + task_logs=[task_log] ) - assert wes_data.run_id == "wes-run-123" - assert wes_data.request == request - assert wes_data.state == WESState.COMPLETE - assert wes_data.run_log == run_log - assert wes_data.task_logs_url.startswith("https://") - assert wes_data.task_logs == [task_log] + sys.stdout = sys.__stdout__ + output = captured_output.getvalue() - # Test different URL schemes in outputs - assert wes_data.outputs["aligned_bam"].startswith("https://") - assert wes_data.outputs["variants_vcf"].startswith("s3://") - assert wes_data.outputs["local_log"].startswith("file://") - assert wes_data.outputs["metrics"].startswith("/") - assert wes_data.outputs["relative_output"].startswith(".") + assert "DeprecationWarning" in output + assert "task_logs" in output + assert "task_logs_url" in output - def test_outputs_with_different_url_schemes(self): - """Test that WESData accepts outputs with different URL schemes and path types""" + def test_run_output_urls(self): + """Test Run accepts outputs with different URL schemes""" output_urls = { "http_url": "http://example.com/output.txt", "https_url": "https://storage.googleapis.com/output.txt", @@ -356,45 +213,8 @@ def test_outputs_with_different_url_schemes(self): "relative_path": "./relative/path/output.txt" } - wes_data = WESData(run_id="wes-run-123", outputs=output_urls) + run = Run(run_id="run-123", outputs=output_urls) # Verify all output URLs are preserved - assert wes_data.outputs["http_url"].startswith("http://") - assert wes_data.outputs["https_url"].startswith("https://") - assert wes_data.outputs["s3_url"].startswith("s3://") - assert wes_data.outputs["gs_url"].startswith("gs://") - assert wes_data.outputs["file_url"].startswith("file://") - assert wes_data.outputs["absolute_path"].startswith("/") - assert wes_data.outputs["relative_path"].startswith(".") - - # Verify we can retrieve all outputs - assert len(wes_data.outputs) == len(output_urls) - assert all(key in wes_data.outputs for key in output_urls) - assert all(wes_data.outputs[key] == value for key, value in output_urls.items()) - - def test_task_logs_deprecation_warning(self): - """Test deprecation warning when task_logs is used""" - task_log = WESTaskLog( - id="task-123", - name="alignment", - cmd=["bwa", "mem", "ref.fa", "reads.fastq"], - stdout="https://storage.googleapis.com/logs/task123/stdout.log", - stderr="https://storage.googleapis.com/logs/task123/stderr.log", - exit_code=0 - ) - - # Capture stdout to test the deprecation warning - captured_output = io.StringIO() - sys.stdout = captured_output - - WESData( - run_id="wes-run-123", - task_logs=[task_log] - ) - - sys.stdout = sys.__stdout__ - output = captured_output.getvalue() - - assert "DeprecationWarning" in output - assert "task_logs" in output - assert "task_logs_url" in output + for key, value in output_urls.items(): + assert run.outputs[key] == value