From e7a48fb31e3398655d84780376230920cef2f552 Mon Sep 17 00:00:00 2001 From: Mark Wiebe <399551+mwiebe@users.noreply.github.com> Date: Tue, 18 Mar 2025 15:36:15 -0700 Subject: [PATCH] fix!: Running jobs with env templates does not support parameters The Open Job Description specification https://github.com/OpenJobDescription/openjd-specifications/wiki/2023-09-Template-Schemas includes both job and environment templates. The openjd CLI lets you run a job in the context of one or more externally defined templates via `openjd run job.yaml --environment env.yaml`. Environment templates can define parameters just like jobs, but when they did this the CLI tool failed to run them. This was because the parameter values were not being processed and forwarded based on the full environment context, they were being regenerated later without the environments. This change fixes that to forward the parameters. BREAKING CHANGE: The functions generate_jobs and job_from_template have changed to accept the environments and return the job parameters alongside the job. Constructing the LocalSession class now requires job parameters. Signed-off-by: Mark Wiebe <399551+mwiebe@users.noreply.github.com> --- src/openjd/cli/_common/__init__.py | 11 +++- src/openjd/cli/_common/_job_from_template.py | 26 +++++--- .../_run/_local_session/_session_manager.py | 19 ++---- src/openjd/cli/_run/_run_command.py | 8 ++- src/openjd/cli/_summary/_summary_command.py | 2 +- test/openjd/cli/conftest.py | 3 +- test/openjd/cli/templates/env_with_param.yaml | 19 ++++++ .../cli/templates/simple_with_j_param.yaml | 2 +- test/openjd/cli/test_common.py | 7 +- test/openjd/cli/test_local_session.py | 24 ++++--- test/openjd/cli/test_run_with_env.py | 64 +++++++++++++++++++ 11 files changed, 148 insertions(+), 37 deletions(-) create mode 100644 test/openjd/cli/templates/env_with_param.yaml create mode 100644 test/openjd/cli/test_run_with_env.py diff --git a/src/openjd/cli/_common/__init__.py b/src/openjd/cli/_common/__init__.py index ee9d9fd..5a51bee 100644 --- a/src/openjd/cli/_common/__init__.py +++ b/src/openjd/cli/_common/__init__.py @@ -21,7 +21,7 @@ read_job_template, read_environment_template, ) -from openjd.model import DecodeValidationError, Job +from openjd.model import DecodeValidationError, Job, JobParameterValues, EnvironmentTemplate __all__ = [ "add_extensions_argument", @@ -112,13 +112,20 @@ def add(self, name: str, description: str, **kwargs) -> ArgumentParser: return self.group.add_parser(name, **kwargs) -def generate_job(args: Namespace, *, supported_extensions: list[str]) -> Job: +def generate_job( + args: Namespace, + environments: list[EnvironmentTemplate] = [], + *, + supported_extensions: list[str], +) -> tuple[Job, JobParameterValues]: try: # Raises: RuntimeError, DecodeValidationError template = read_job_template(args.path, supported_extensions=supported_extensions) + # Raises: RuntimeError return job_from_template( template, + environments, args.job_params if args.job_params else None, Path(os.path.abspath(args.path.parent)), Path(os.getcwd()), diff --git a/src/openjd/cli/_common/_job_from_template.py b/src/openjd/cli/_common/_job_from_template.py index 9f6f4cb..11727d4 100644 --- a/src/openjd/cli/_common/_job_from_template.py +++ b/src/openjd/cli/_common/_job_from_template.py @@ -4,14 +4,16 @@ import json from pathlib import Path import re -from typing import Union +from typing import Optional, Union import yaml from ._validation_utils import get_doc_type from openjd.model import ( DecodeValidationError, DocumentType, + EnvironmentTemplate, Job, + JobParameterValues, JobTemplate, create_job, preprocess_job_parameters, @@ -54,7 +56,7 @@ def get_params_from_file(parameter_string: str) -> Union[dict, list]: return parameters -def get_job_params(parameter_args: list[str]) -> dict: +def get_job_params(parameter_args: Optional[list[str]]) -> dict: """ Resolves Job Parameters from a list of command-line arguments. Arguments may be a filepath or a string with format 'Key=Value'. @@ -62,7 +64,8 @@ def get_job_params(parameter_args: list[str]) -> dict: Raises: RuntimeError if the provided Parameters are formatted incorrectly or can't be opened """ parameter_dict: dict = {} - for arg in parameter_args: + + for arg in parameter_args or []: arg = arg.strip() # Case 1: Provided argument is a filepath if arg.startswith("file://"): @@ -104,17 +107,18 @@ def get_job_params(parameter_args: list[str]) -> dict: def job_from_template( template: JobTemplate, + environments: list[EnvironmentTemplate], parameter_args: list[str] | None, job_template_dir: Path, current_working_dir: Path, -) -> Job: +) -> tuple[Job, JobParameterValues]: """ - Given a decoded Job Template and a user-inputted parameter dictionary, - generates a Job object. + Given a decoded Job Template and a user-input parameter dictionary, + generates a Job object and the parameter values for running the job. Raises: RuntimeError if parameters are an unsupported type or don't correspond to the template """ - parameter_dict = get_job_params(parameter_args) if parameter_args else {} + parameter_dict = get_job_params(parameter_args) try: parameter_values = preprocess_job_parameters( @@ -122,11 +126,17 @@ def job_from_template( job_parameter_values=parameter_dict, job_template_dir=job_template_dir, current_working_dir=current_working_dir, + environment_templates=environments, ) except ValueError as ve: raise RuntimeError(str(ve)) try: - return create_job(job_template=template, job_parameter_values=parameter_values) + job = create_job( + job_template=template, + job_parameter_values=parameter_values, + environment_templates=environments, + ) + return (job, parameter_values) except DecodeValidationError as dve: raise RuntimeError(f"Could not generate Job from template and parameters: {str(dve)}") diff --git a/src/openjd/cli/_run/_local_session/_session_manager.py b/src/openjd/cli/_run/_local_session/_session_manager.py index feb74a8..3d847ba 100644 --- a/src/openjd/cli/_run/_local_session/_session_manager.py +++ b/src/openjd/cli/_run/_local_session/_session_manager.py @@ -21,8 +21,6 @@ IntRangeExpr, Job, JobParameterValues, - ParameterValue, - ParameterValueType, Step, StepParameterSpaceIterator, TaskParameterSet, @@ -78,6 +76,7 @@ def __init__( self, *, job: Job, + job_parameter_values: JobParameterValues, session_id: str, timestamp_format: LoggingTimestampFormat = LoggingTimestampFormat.RELATIVE, path_mapping_rules: Optional[list[PathMappingRule]] = None, @@ -93,18 +92,9 @@ def __init__( self._environments = environments # Create an OpenJD Session - job_parameters: JobParameterValues - if job.parameters: - job_parameters = { - name: ParameterValue(type=ParameterValueType(param.type.value), value=param.value) - for name, param in job.parameters.items() - } - else: - job_parameters = dict[str, ParameterValue]() - self._openjd_session = Session( session_id=self.session_id, - job_parameter_values=job_parameters, + job_parameter_values=job_parameter_values, path_mapping_rules=self._path_mapping_rules, callback=self._action_callback, retain_working_dir=retain_working_dir, @@ -131,6 +121,11 @@ def _context_manager_cleanup(self): signal(SIGTERM, SIG_DFL) self._started = False + # A blank line to separate the job log output from this status message + LOG.info( + msg="", + extra={"session_id": self.session_id}, + ) if self.failed: LOG.info( msg=f"Open Job Description CLI: ERROR executing action: '{self.failed_action}' (see Task logs for details)", diff --git a/src/openjd/cli/_run/_run_command.py b/src/openjd/cli/_run/_run_command.py index 7c3b891..a113d07 100644 --- a/src/openjd/cli/_run/_run_command.py +++ b/src/openjd/cli/_run/_run_command.py @@ -28,6 +28,7 @@ DecodeValidationError, EnvironmentTemplate, Job, + JobParameterValues, Step, StepDependencyGraph, StepParameterSpaceIterator, @@ -317,6 +318,7 @@ def _validate_task_params(step: Step, task_params: list[dict[str, str]]) -> None def _run_local_session( *, job: Job, + job_parameter_values: JobParameterValues, step_list: list[Step], selected_step: Optional[Step], timestamp_format: LoggingTimestampFormat, @@ -337,6 +339,7 @@ def _run_local_session( step_name = "" with LocalSession( job=job, + job_parameter_values=job_parameter_values, timestamp_format=timestamp_format, session_id="CLI-session", path_mapping_rules=path_mapping_rules, @@ -442,7 +445,9 @@ def do_run(args: Namespace) -> OpenJDCliResult: try: # Raises: RuntimeError - the_job = generate_job(args, supported_extensions=extensions) + the_job, job_parameter_values = generate_job( + args, environments, supported_extensions=extensions + ) # Map Step names to Step objects so they can be easily accessed step_map = {step.name: step for step in the_job.steps} @@ -509,6 +514,7 @@ def do_run(args: Namespace) -> OpenJDCliResult: return _run_local_session( job=the_job, + job_parameter_values=job_parameter_values, step_list=step_list, selected_step=selected_step, task_parameter_values=task_parameter_values, diff --git a/src/openjd/cli/_summary/_summary_command.py b/src/openjd/cli/_summary/_summary_command.py index 227c81b..ebcb52a 100644 --- a/src/openjd/cli/_summary/_summary_command.py +++ b/src/openjd/cli/_summary/_summary_command.py @@ -35,7 +35,7 @@ def do_summary(args: Namespace) -> OpenJDCliResult: try: # Raises: RuntimeError - sample_job = generate_job(args, supported_extensions=extensions) + sample_job, _ = generate_job(args, supported_extensions=extensions) except RuntimeError as rte: return OpenJDCliResult(status="error", message=str(rte)) diff --git a/test/openjd/cli/conftest.py b/test/openjd/cli/conftest.py index b0ee3ce..111ca35 100644 --- a/test/openjd/cli/conftest.py +++ b/test/openjd/cli/conftest.py @@ -29,8 +29,9 @@ def sample_job_and_dirs(request): template = decode_job_template(template=MOCK_TEMPLATE) yield ( - job_from_template( + *job_from_template( template=template, + environments=[], parameter_args=request.param, job_template_dir=template_dir, current_working_dir=current_working_dir, diff --git a/test/openjd/cli/templates/env_with_param.yaml b/test/openjd/cli/templates/env_with_param.yaml new file mode 100644 index 0000000..2214c26 --- /dev/null +++ b/test/openjd/cli/templates/env_with_param.yaml @@ -0,0 +1,19 @@ +specificationVersion: environment-2023-09 +parameterDefinitions: + - name: EnvParam + type: STRING + default: DefaultForEnvParam +environment: + name: EnvWithParam + script: + actions: + onEnter: + command: python + args: + - -c + - print('EnvWithParam Enter {{Param.EnvParam}}') + onExit: + command: python + args: + - -c + - print('EnvWithParam Exit {{Param.EnvParam}}') diff --git a/test/openjd/cli/templates/simple_with_j_param.yaml b/test/openjd/cli/templates/simple_with_j_param.yaml index a5ce128..9f7875a 100644 --- a/test/openjd/cli/templates/simple_with_j_param.yaml +++ b/test/openjd/cli/templates/simple_with_j_param.yaml @@ -9,7 +9,7 @@ "actions": { "onRun": { "command": "python", - "args": ["-c", "print('DoTask')"], + "args": ["-c", "print('DoTask {{Param.J}}')"], } } }, diff --git a/test/openjd/cli/test_common.py b/test/openjd/cli/test_common.py index 969a60a..ad21193 100644 --- a/test/openjd/cli/test_common.py +++ b/test/openjd/cli/test_common.py @@ -366,7 +366,7 @@ def test_job_from_template_success( template_dir, current_working_dir = template_dir_and_cwd template = decode_job_template(template=template_dict) - result = job_from_template(template, mock_params, template_dir, current_working_dir) + result, _ = job_from_template(template, [], mock_params, template_dir, current_working_dir) assert result.name == expected_job_name assert [step.model_dump(exclude_none=True) for step in result.steps] == [ step.model_dump(exclude_none=True) for step in template.steps @@ -415,7 +415,7 @@ def test_job_from_template_error( template = decode_job_template(template=template_dict) with pytest.raises(RuntimeError) as rte: - job_from_template(template, mock_params, template_dir, current_working_dir) + job_from_template(template, [], mock_params, template_dir, current_working_dir) assert expected_error in str(rte.value) @@ -455,8 +455,9 @@ def test_generate_job_success( new=Mock(side_effect=job_from_template), ) as patched_job_from_template: generate_job(mock_args, supported_extensions=[]) + print(patched_job_from_template.call_args_list) patched_job_from_template.assert_called_once_with( - ANY, expected_param_list, Path(temp_template.name).parent, Path(os.getcwd()) + ANY, [], expected_param_list, Path(temp_template.name).parent, Path(os.getcwd()) ) Path(temp_template.name).unlink() diff --git a/test/openjd/cli/test_local_session.py b/test/openjd/cli/test_local_session.py index 081cee0..2a23847 100644 --- a/test/openjd/cli/test_local_session.py +++ b/test/openjd/cli/test_local_session.py @@ -60,7 +60,7 @@ def test_localsession_initialize( """ Test that initializing the local Session enters external and job environments, and is ready to run tasks. """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs + sample_job, sample_job_parameters, template_dir, current_working_dir = sample_job_and_dirs with ( patch.object( LocalSession, @@ -72,7 +72,9 @@ def test_localsession_initialize( LocalSession, "run_step", autospec=True, side_effect=LocalSession.run_step ) as patched_run_step, ): - with LocalSession(job=sample_job, session_id="my-session") as session: + with LocalSession( + job=sample_job, job_parameter_values=sample_job_parameters, session_id="my-session" + ) as session: assert session._openjd_session.state == SessionState.READY # It should have entered the external and job environments in order @@ -91,12 +93,14 @@ def test_localsession_initialize( @pytest.mark.usefixtures("sample_job_and_dirs") def test_localsession_traps_sigint(sample_job_and_dirs: tuple): # Make sure that we hook up, and remove the signal handler when using the local session - sample_job, template_dir, current_working_dir = sample_job_and_dirs + sample_job, sample_job_parameters, template_dir, current_working_dir = sample_job_and_dirs # GIVEN with patch.object(local_session_mod, "signal") as signal_mod: # WHEN - with LocalSession(job=sample_job, session_id="test-id") as localsession: + with LocalSession( + job=sample_job, job_parameter_values=sample_job_parameters, session_id="test-id" + ) as localsession: pass # THEN @@ -124,7 +128,7 @@ def test_localsession_run_success( """ Test that calling `run_step` causes the local Session to run the tasks requested in that step. """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs + sample_job, sample_job_parameters, template_dir, current_working_dir = sample_job_and_dirs if parameter_sets is None: parameter_sets = StepParameterSpaceIterator( @@ -151,7 +155,9 @@ def test_localsession_run_success( LocalSession, "run_task", autospec=True, side_effect=LocalSession.run_task ) as patched_run_task, ): - with LocalSession(job=sample_job, session_id="my-session") as session: + with LocalSession( + job=sample_job, job_parameter_values=sample_job_parameters, session_id="my-session" + ) as session: session.run_step( sample_job.steps[step_index], task_parameters=parameter_sets, @@ -192,7 +198,7 @@ def test_localsession_run_failed(sample_job_and_dirs: tuple, capsys: pytest.Capt """ Test that a LocalSession can gracefully handle an error in its inner Session. """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs + sample_job, sample_job_parameters, template_dir, current_working_dir = sample_job_and_dirs with ( patch.object( LocalSession, @@ -201,7 +207,9 @@ def test_localsession_run_failed(sample_job_and_dirs: tuple, capsys: pytest.Capt side_effect=LocalSession.run_environment_enters, ) as patched_run_environment_enters, ): - with LocalSession(job=sample_job, session_id="bad-session") as session: + with LocalSession( + job=sample_job, job_parameter_values=sample_job_parameters, session_id="bad-session" + ) as session: with pytest.raises(LocalSessionFailed): session.run_step(sample_job.steps[SampleSteps.BadCommand]) diff --git a/test/openjd/cli/test_run_with_env.py b/test/openjd/cli/test_run_with_env.py new file mode 100644 index 0000000..2adf049 --- /dev/null +++ b/test/openjd/cli/test_run_with_env.py @@ -0,0 +1,64 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from pathlib import Path +import re + +from . import run_openjd_cli_main, format_capsys_outerr + +TEMPLATE_DIR = Path(__file__).parent / "templates" + + +def test_run_job_with_env_default_params(capsys): + # Run a job with env_with_param as an external environment, + # leaving the environment's parameter at its default value + + outerr = run_openjd_cli_main( + capsys, + args=[ + "run", + str(TEMPLATE_DIR / "simple_with_j_param.yaml"), + "-p", + "J=Jvalue", + "--environment", + str(TEMPLATE_DIR / "env_with_param.yaml"), + ], + expected_exit_code=0, + ) + + for expected_message_regex in [ + "EnvWithParam Enter DefaultForEnvParam", + "DoTask Jvalue", + "EnvWithParam Exit DefaultForEnvParam", + ]: + assert re.search( + expected_message_regex, outerr.out + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +def test_run_job_with_env_provide_env_param(capsys): + # Run a job with env_with_param as an external environment, + # explicitly providing the env parameter + + outerr = run_openjd_cli_main( + capsys, + args=[ + "run", + str(TEMPLATE_DIR / "simple_with_j_param.yaml"), + "-p", + "J=Jvalue", + "-p", + "EnvParam=EnvParamValue", + "--environment", + str(TEMPLATE_DIR / "env_with_param.yaml"), + ], + expected_exit_code=0, + ) + + for expected_message_regex in [ + "EnvWithParam Enter EnvParamValue", + "DoTask Jvalue", + "EnvWithParam Exit EnvParamValue", + ]: + assert re.search( + expected_message_regex, outerr.out + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}"