From 1e6c05b665d956a2a276440dcbe4aaffb2740645 Mon Sep 17 00:00:00 2001 From: Mark <399551+mwiebe@users.noreply.github.com> Date: Wed, 5 Nov 2025 11:52:21 -0800 Subject: [PATCH] feat: Add run_subprocess function to the Session class This new function provides the ability to run an ad hoc command within the session environment, choosing whether to use the environment variables from the session enter actions or to ignore them. The parameters for the command are direct, so you don't have to construct a temporary Action or StepAction object to run a simple command. Signed-off-by: Mark <399551+mwiebe@users.noreply.github.com> --- src/openjd/sessions/_session.py | 123 ++ .../sessions/test_session_run_subprocess.py | 1113 +++++++++++++++++ 2 files changed, 1236 insertions(+) create mode 100644 test/openjd/sessions/test_session_run_subprocess.py diff --git a/src/openjd/sessions/_session.py b/src/openjd/sessions/_session.py index 6cdac0d3..043f491a 100644 --- a/src/openjd/sessions/_session.py +++ b/src/openjd/sessions/_session.py @@ -27,6 +27,13 @@ ) from openjd.model import version as model_version from openjd.model.v2023_09 import ( + Action as Action_2023_09, + ArgString as ArgString_2023_09, + CancelationMethodTerminate as CancelationMethodTerminate_2023_09, + CancelationMode as CancelationMode_2023_09, + CommandString as CommandString_2023_09, + StepActions as StepActions_2023_09, + StepScript as StepScript_2023_09, ValueReferenceConstants as ValueReferenceConstants_2023_09, ) from ._action_filter import ActionMessageKind, ActionMonitoringFilter @@ -900,6 +907,122 @@ def _run_task_without_session_env( # than after -- run() itself may end up setting the action state to FAILED. self._runner.run() + def run_subprocess( + self, + *, + command: str, + args: Optional[list[str]] = None, + timeout: Optional[int] = None, + os_env_vars: Optional[dict[str, str]] = None, + use_session_env_vars: bool = True, + log_banner_message: Optional[str] = None, + ) -> None: + """Run an ad-hoc subprocess within the Session. + + This method is non-blocking; it will exit when the subprocess is either + confirmed to have started running, or has failed to be started. + + Arguments: + command (str): The command/executable to run. Used exactly as provided + without format string substitution. + args (Optional[list[str]]): Arguments to pass to the command. Used exactly + as provided without format string substitution. Defaults to None. + timeout (Optional[int]): Maximum allowed runtime of the subprocess in seconds. + Must be a positive integer if provided. If None, the subprocess can run + indefinitely. Defaults to None. + os_env_vars (Optional[dict[str, str]]): Additional OS environment variables + to inject into the subprocess. Values provided override original process + environment variables and are overridden by environment-defined variables. + use_session_env_vars (bool): If True, includes environment variables from + the session and entered environments. If False, only uses os_env_vars + and original process environment variables. Defaults to True. + log_banner_message (Optional[str]): Custom message to display in a banner + before running the subprocess. If provided, logs a banner with this message. + If None, no banner is logged. Defaults to None. + + Raises: + RuntimeError: If the Session is not in the READY state. + ValueError: If timeout is provided and is not a positive integer, or if command is empty. + """ + # State validation + if self.state != SessionState.READY: + raise RuntimeError( + f"Session must be in the READY state to run a subprocess. " + f"Current state: {self.state.value}" + ) + + # Parameter validation + if timeout is not None and timeout <= 0: + raise ValueError("timeout must be a positive integer") + + if not command or not command.strip(): + raise ValueError("command must be a non-empty string") + + # Log banner if requested + if log_banner_message: + log_section_banner(self._logger, log_banner_message) + + # Reset action state + self._reset_action_state() + + # Construct Action model + cancelation = CancelationMethodTerminate_2023_09(mode=CancelationMode_2023_09.TERMINATE) + + action_command = CommandString_2023_09(command) + action_args = [ArgString_2023_09(arg) for arg in args] if args else None + + action = Action_2023_09( + command=action_command, + args=action_args, + timeout=timeout, + cancelation=cancelation, + ) + + # Construct StepScript model + step_actions = StepActions_2023_09(onRun=action) + + step_script = StepScript_2023_09( + actions=step_actions, + embeddedFiles=None, + ) + + # Create empty symbol table (no format string substitution for ad-hoc subprocesses) + symtab = SymbolTable() + + # Evaluate environment variables + if use_session_env_vars: + action_env_vars = self._evaluate_current_session_env_vars(os_env_vars) + else: + action_env_vars = dict[str, Optional[str]](self._process_env) # Make a copy + if os_env_vars: + action_env_vars.update(**os_env_vars) + + # Note: Path mapping is not materialized for ad-hoc subprocesses since it's only + # accessible via template variable substitution (e.g., {{Session.PathMappingRulesFile}}), + # which is explicitly disabled for run_subprocess to ensure predictable behavior. + + # Create and start StepScriptRunner + self._runner = StepScriptRunner( + logger=self._logger, + user=self._user, + os_env_vars=action_env_vars, + session_working_directory=self.working_directory, + startup_directory=self.working_directory, + callback=self._action_callback, + script=step_script, + symtab=symtab, + session_files_directory=self.files_directory, + ) + + # Sets the subprocess running. + # Returns immediately after it has started, or is running + self._action_state = ActionState.RUNNING + self._state = SessionState.RUNNING + # Note: This may fail immediately (e.g. if we cannot write embedded files to disk), + # so it's important to set the action_state to RUNNING before calling run(), rather + # than after -- run() itself may end up setting the action state to FAILED. + self._runner.run() + # ========================= # Helpers diff --git a/test/openjd/sessions/test_session_run_subprocess.py b/test/openjd/sessions/test_session_run_subprocess.py new file mode 100644 index 00000000..809c86e7 --- /dev/null +++ b/test/openjd/sessions/test_session_run_subprocess.py @@ -0,0 +1,1113 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import time +import uuid +from unittest.mock import MagicMock + +import pytest + +from openjd.model import ParameterValue +from openjd.model.v2023_09 import Environment as Environment_2023_09 +from openjd.model.v2023_09 import ( + EnvironmentVariableValueString as EnvironmentVariableValueString_2023_09, +) +from openjd.sessions import ( + ActionState, + ActionStatus, + Session, + SessionState, +) +from openjd.sessions._os_checker import is_posix, is_windows +from openjd.sessions._session_user import PosixSessionUser, WindowsSessionUser + +from .conftest import ( + has_posix_target_user, + has_windows_user, + WIN_SET_TEST_ENV_VARS_MESSAGE, + POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE, +) + + +class TestRunSubprocess: + """Tests for the Session.run_subprocess method.""" + + def test_basic_execution(self, python_exe: str, caplog: pytest.LogCaptureFixture) -> None: + """Test successful subprocess execution with simple command.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('Hello from subprocess')"], + ) + + # THEN + assert session.state == SessionState.RUNNING + assert session.action_status is not None + assert session.action_status.state == ActionState.RUNNING + + # Wait for the process to exit + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + assert session.state == SessionState.READY + assert session.action_status == ActionStatus(state=ActionState.SUCCESS, exit_code=0) + assert "Hello from subprocess" in caplog.messages + + def test_command_with_arguments( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test command with multiple arguments.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import sys; print(' '.join(sys.argv[1:]))", "arg1", "arg2", "arg3"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + assert "arg1 arg2 arg3" in caplog.messages + + def test_special_characters_in_arguments( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that special characters in arguments are passed through correctly.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN - Use special characters + session.run_subprocess( + command=python_exe, + args=["-c", "print('$VAR %PATH% @special')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - The literal string should be printed + assert session.state == SessionState.READY + assert "$VAR %PATH% @special" in caplog.messages + + def test_timeout_completes_before_timeout(self, python_exe: str) -> None: + """Test subprocess that completes before timeout.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import time; time.sleep(0.1); print('done')"], + timeout=5, # 5 seconds timeout, but completes in 0.1 seconds + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + + def test_timeout_exceeds_timeout(self, python_exe: str) -> None: + """Test subprocess that exceeds timeout and is terminated.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import time; time.sleep(10)"], + timeout=1, # 1 second timeout + ) + + # Wait for timeout + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY_ENDING + assert session.action_status is not None + assert session.action_status.state == ActionState.TIMEOUT + + def test_no_timeout(self, python_exe: str) -> None: + """Test subprocess with no timeout runs until completion.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import time; time.sleep(0.2); print('completed')"], + timeout=None, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + + def test_env_vars_with_use_session_env_vars_true( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test environment variables with use_session_env_vars=True.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + session_env_vars = {"SESSION_VAR": "session_value"} + subprocess_env_vars = {"SUBPROCESS_VAR": "subprocess_value"} + + with Session( + session_id=session_id, + job_parameter_values=job_params, + os_env_vars=session_env_vars, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + 'import os; print(f\'{os.environ.get("SESSION_VAR", "MISSING")} {os.environ.get("SUBPROCESS_VAR", "MISSING")}\')', + ], + os_env_vars=subprocess_env_vars, + use_session_env_vars=True, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert "session_value subprocess_value" in caplog.messages + + def test_env_vars_with_use_session_env_vars_false( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that entered environment variables are excluded when use_session_env_vars=False.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + subprocess_env_vars = {"SUBPROCESS_VAR": "subprocess_value"} + + # Create an environment with a variable + environment = Environment_2023_09( + name="TestEnv", + variables={ + "ENV_VAR": EnvironmentVariableValueString_2023_09("env_value"), + }, + ) + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # Enter the environment + session.enter_environment(environment=environment) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + 'import os; print(f\'{os.environ.get("ENV_VAR", "MISSING")} {os.environ.get("SUBPROCESS_VAR", "MISSING")}\')', + ], + os_env_vars=subprocess_env_vars, + use_session_env_vars=False, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - ENV_VAR should be MISSING because use_session_env_vars=False + assert session.state == SessionState.READY + assert "MISSING subprocess_value" in caplog.messages + + def test_env_vars_precedence(self, python_exe: str, caplog: pytest.LogCaptureFixture) -> None: + """Test environment variable precedence (os_env_vars overrides).""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + session_env_vars = {"OVERRIDE_VAR": "session_value"} + subprocess_env_vars = {"OVERRIDE_VAR": "subprocess_value"} + + with Session( + session_id=session_id, + job_parameter_values=job_params, + os_env_vars=session_env_vars, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "import os; print(os.environ.get('OVERRIDE_VAR', 'MISSING'))", + ], + os_env_vars=subprocess_env_vars, + use_session_env_vars=True, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - subprocess_value should override session_value + assert session.state == SessionState.READY + assert "subprocess_value" in caplog.messages + + @pytest.mark.parametrize( + "state", + [ + pytest.param(state, id=state.value) + for state in SessionState + if state != SessionState.READY + ], + ) + def test_state_validation_not_ready(self, state: SessionState, python_exe: str) -> None: + """Test calling from non-READY state raises RuntimeError.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session._state = state + + # THEN + with pytest.raises(RuntimeError, match="Session must be in the READY state"): + session.run_subprocess(command=python_exe, args=["-c", "print('test')"]) + + def test_state_validation_ready_succeeds(self, python_exe: str) -> None: + """Test calling from READY state succeeds.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + assert session.state == SessionState.READY + + # WHEN + session.run_subprocess(command=python_exe, args=["-c", "print('test')"]) + + # THEN + assert session.state == SessionState.RUNNING + + def test_invalid_timeout_zero(self, python_exe: str) -> None: + """Test with invalid timeout (zero).""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN/THEN + with pytest.raises(ValueError, match="timeout must be a positive integer"): + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + timeout=0, + ) + + def test_invalid_timeout_negative(self, python_exe: str) -> None: + """Test with invalid timeout (negative).""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN/THEN + with pytest.raises(ValueError, match="timeout must be a positive integer"): + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + timeout=-1, + ) + + def test_empty_command_string(self, python_exe: str) -> None: + """Test with empty command string.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN/THEN + with pytest.raises(ValueError, match="command must be a non-empty string"): + session.run_subprocess(command="", args=["-c", "print('test')"]) + + def test_whitespace_only_command(self, python_exe: str) -> None: + """Test with whitespace-only command string.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN/THEN + with pytest.raises(ValueError, match="command must be a non-empty string"): + session.run_subprocess(command=" ", args=["-c", "print('test')"]) + + def test_nonexistent_command(self) -> None: + """Test with non-existent command.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command="nonexistent_command_12345", + args=["arg1"], + ) + + # Wait for failure + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY_ENDING + assert session.action_status is not None + assert session.action_status.state == ActionState.FAILED + + def test_callback_integration(self, python_exe: str) -> None: + """Test callback invocation during subprocess execution.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert callback.call_count >= 1 + # Check that callback was called with session_id and ActionStatus + callback.assert_called_with( + session_id, ActionStatus(state=ActionState.SUCCESS, exit_code=0) + ) + + @pytest.mark.skipif(not is_posix(), reason="Posix-only test.") + @pytest.mark.xfail( + not has_posix_target_user(), + reason=POSIX_SET_TARGET_USER_ENV_VARS_MESSAGE, + ) + def test_user_context_posix( + self, posix_target_user: PosixSessionUser, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test subprocess runs as configured user on POSIX.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + user=posix_target_user, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import os; print(os.getuid())"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + + @pytest.mark.skipif(not is_windows(), reason="Windows-only test.") + @pytest.mark.xfail(not has_windows_user(), reason=WIN_SET_TEST_ENV_VARS_MESSAGE) + @pytest.mark.timeout(90) + def test_user_context_windows(self, windows_user: WindowsSessionUser, python_exe: str) -> None: + """Test subprocess runs as configured user on Windows.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + user=windows_user, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "import os; print(os.getlogin())"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + + def test_multiple_sequential_calls( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test multiple run_subprocess calls in sequence.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN - First subprocess + session.run_subprocess( + command=python_exe, + args=["-c", "print('first')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + assert session.state == SessionState.READY + assert "first" in caplog.messages + + # WHEN - Second subprocess + session.run_subprocess( + command=python_exe, + args=["-c", "print('second')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert "second" in caplog.messages + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + + def test_subprocess_with_entered_environment( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test run_subprocess within environment context with use_session_env_vars=True.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + environment = Environment_2023_09( + name="TestEnv", + variables={ + "TEST_ENV_VAR": EnvironmentVariableValueString_2023_09("test_value"), + }, + ) + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # Enter environment + session.enter_environment(environment=environment) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "import os; print(os.environ.get('TEST_ENV_VAR', 'MISSING'))", + ], + use_session_env_vars=True, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert "test_value" in caplog.messages + + def test_log_banner_message_with_custom_message( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that log_banner_message logs a custom banner when provided.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + custom_message = "Running Custom Subprocess" + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + log_banner_message=custom_message, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Check that the banner was logged with the custom message + assert "==============================================" in caplog.text + assert f"--------- {custom_message}" in caplog.text + + def test_log_banner_message_none_no_banner( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that no banner is logged when log_banner_message is None.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + caplog.clear() + + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + log_banner_message=None, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Check that no custom banner was logged when log_banner_message is None + # There might be other banners from session setup, but we verify the custom message isn't there + assert ( + "--------- Running" not in caplog.text + or "--------- Running Custom Subprocess" not in caplog.text + ) + + def test_log_banner_message_empty_string_no_banner( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that no banner is logged when log_banner_message is an empty string.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + caplog.clear() + + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + log_banner_message="", + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Empty string is falsy, so no banner should be logged + # Verify no empty message banner - check that all banner messages are from expected sources + assert "--------- " not in caplog.text or caplog.text.count( + "--------- " + ) == caplog.text.count("--------- Running") + caplog.text.count( + "--------- Entering" + ) + caplog.text.count( + "--------- Exiting" + ) + + def test_log_banner_message_with_special_characters( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that log_banner_message handles special characters correctly.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + custom_message = "Running: Test $VAR & Special " + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('test')"], + log_banner_message=custom_message, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Check that the banner was logged with the special characters intact + assert f"--------- {custom_message}" in caplog.text + + def test_log_banner_message_multiple_calls_different_messages( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test multiple run_subprocess calls with different banner messages.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN - First subprocess with banner + session.run_subprocess( + command=python_exe, + args=["-c", "print('first')"], + log_banner_message="First Subprocess", + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + assert "--------- First Subprocess" in caplog.text + + # WHEN - Second subprocess with different banner + session.run_subprocess( + command=python_exe, + args=["-c", "print('second')"], + log_banner_message="Second Subprocess", + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + assert "--------- First Subprocess" in caplog.text + assert "--------- Second Subprocess" in caplog.text + + def test_openjd_progress_message_triggers_callback( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that openjd_progress messages trigger callback with progress value.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('openjd_progress: 50.0')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Verify callback was called with progress + progress_calls = [ + call + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].progress == 50.0 + ] + assert len(progress_calls) > 0, "Callback should be invoked with progress=50.0" + + def test_openjd_status_message_triggers_callback( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that openjd_status messages trigger callback with status message.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('openjd_status: Processing data')"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Verify callback was called with status message + status_calls = [ + call + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].status_message == "Processing data" + ] + assert ( + len(status_calls) > 0 + ), "Callback should be invoked with status_message='Processing data'" + + def test_multiple_openjd_progress_messages( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test multiple openjd_progress messages trigger callbacks with correct values.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "print('openjd_progress: 25.0'); print('openjd_progress: 50.0'); print('openjd_progress: 75.0')", + ], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Verify callbacks were called with each progress value + progress_values = [ + call[0][1].progress + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].progress is not None + ] + assert 25.0 in progress_values, "Callback should be invoked with progress=25.0" + assert 50.0 in progress_values, "Callback should be invoked with progress=50.0" + assert 75.0 in progress_values, "Callback should be invoked with progress=75.0" + + def test_multiple_openjd_status_messages( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test multiple openjd_status messages trigger callbacks with correct values.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "print('openjd_status: Starting'); print('openjd_status: Processing'); print('openjd_status: Finishing')", + ], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + # Verify callbacks were called with each status message + status_messages = [ + call[0][1].status_message + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].status_message is not None + ] + assert ( + "Starting" in status_messages + ), "Callback should be invoked with status_message='Starting'" + assert ( + "Processing" in status_messages + ), "Callback should be invoked with status_message='Processing'" + assert ( + "Finishing" in status_messages + ), "Callback should be invoked with status_message='Finishing'" + + def test_openjd_progress_and_status_combined( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that both openjd_progress and openjd_status messages work together.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "print('openjd_status: Starting task'); print('openjd_progress: 33.3'); print('openjd_status: Halfway done'); print('openjd_progress: 66.6'); print('openjd_status: Almost complete'); print('openjd_progress: 100.0')", + ], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY + + # Verify progress callbacks + progress_values = [ + call[0][1].progress + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].progress is not None + ] + assert 33.3 in progress_values + assert 66.6 in progress_values + assert 100.0 in progress_values + + # Verify status callbacks + status_messages = [ + call[0][1].status_message + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].status_message is not None + ] + assert "Starting task" in status_messages + assert "Halfway done" in status_messages + assert "Almost complete" in status_messages + + def test_openjd_fail_message_triggers_callback( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that openjd_fail messages trigger callback with fail message.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN + session.run_subprocess( + command=python_exe, + args=["-c", "print('openjd_fail: Something went wrong'); import sys; sys.exit(1)"], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN + assert session.state == SessionState.READY_ENDING + # Verify callback was called with fail message + fail_calls = [ + call + for call in callback.call_args_list + if len(call[0]) == 2 and call[0][1].fail_message == "Something went wrong" + ] + assert ( + len(fail_calls) > 0 + ), "Callback should be invoked with fail_message='Something went wrong'" + + def test_openjd_progress_invalid_value_ignored( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test that invalid openjd_progress values are ignored and don't crash.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + callback = MagicMock() + + with Session( + session_id=session_id, + job_parameter_values=job_params, + callback=callback, + ) as session: + # WHEN - Use invalid progress values + session.run_subprocess( + command=python_exe, + args=[ + "-c", + "print('openjd_progress: not_a_number'); print('openjd_progress: -10'); print('openjd_progress: 150'); print('test completed')", + ], + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - Should complete successfully despite invalid progress values + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + assert "test completed" in caplog.text + + def test_cancelation_of_running_subprocess(self, python_exe: str) -> None: + """Test cancelation of a running subprocess.""" + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN - Start a long-running subprocess + session.run_subprocess( + command=python_exe, + args=["-c", "import time; print('Starting'); time.sleep(10); print('End')"], + ) + + # Give it a moment to start + time.sleep(0.5) + + # Cancel the action while it's running + session.cancel_action() + + # Wait for the process to exit + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - Verify terminate-based cancelation behavior + assert session.state == SessionState.READY_ENDING + assert session.action_status is not None + assert session.action_status.state == ActionState.CANCELED + + if is_posix(): + # On POSIX systems, SIGKILL results in exit code -9 + assert session.action_status.exit_code == -9 + else: + # On Windows, process termination results in exit code 15 + assert session.action_status.exit_code == 15 + + +class TestRunSubprocessIntegration: + """Integration tests for Session.run_subprocess method.""" + + def test_run_subprocess_within_environment_context( + self, python_exe: str, caplog: pytest.LogCaptureFixture + ) -> None: + """Test run_subprocess within environment context with both use_session_env_vars values. + + This integration test verifies: + - Environment variables are inherited when use_session_env_vars=True + - Environment variables are NOT inherited when use_session_env_vars=False + - Proper environment lifecycle (enter, run subprocesses, exit) + """ + # GIVEN + session_id = uuid.uuid4().hex + job_params = dict[str, ParameterValue]() + + # Create an environment with test variables + environment = Environment_2023_09( + name="TestEnv", + variables={ + "ENV_VAR_1": EnvironmentVariableValueString_2023_09("env_value_1"), + "ENV_VAR_2": EnvironmentVariableValueString_2023_09("env_value_2"), + }, + ) + + with Session(session_id=session_id, job_parameter_values=job_params) as session: + # WHEN - Enter the environment + identifier = session.enter_environment(environment=environment) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + assert session.state == SessionState.READY + + # WHEN - Run subprocess with use_session_env_vars=True + caplog.clear() + session.run_subprocess( + command=python_exe, + args=[ + "-c", + 'import os; v1 = os.environ.get("ENV_VAR_1", "MISSING"); v2 = os.environ.get("ENV_VAR_2", "MISSING"); print(f"ENV_VAR_1={v1} ENV_VAR_2={v2}")', + ], + use_session_env_vars=True, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - Verify environment variables are inherited + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + assert "ENV_VAR_1=env_value_1 ENV_VAR_2=env_value_2" in caplog.text + + # WHEN - Run subprocess with use_session_env_vars=False + caplog.clear() + session.run_subprocess( + command=python_exe, + args=[ + "-c", + 'import os; v1 = os.environ.get("ENV_VAR_1", "MISSING"); v2 = os.environ.get("ENV_VAR_2", "MISSING"); print(f"ENV_VAR_1={v1} ENV_VAR_2={v2}")', + ], + use_session_env_vars=False, + ) + + # Wait for completion + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - Verify environment variables are NOT inherited + assert session.state == SessionState.READY + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS + assert "ENV_VAR_1=MISSING ENV_VAR_2=MISSING" in caplog.text + + # WHEN - Exit the environment + session.exit_environment(identifier=identifier) + while session.state == SessionState.RUNNING: + time.sleep(0.1) + + # THEN - Session should be in READY_ENDING state after exiting environment + assert session.state == SessionState.READY_ENDING + assert session.action_status is not None + assert session.action_status.state == ActionState.SUCCESS