diff --git a/docs/news.rst b/docs/news.rst index 8137d6f98..bb1670003 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -6,6 +6,7 @@ next (unreleased) Lab ^^^ +* Add ``wall_time_limit`` parameter to ``add_command()`` to allow explicit wall-clock time limits (Jendrik Seipp). * Remove tests for Python 3.8 and 3.9. Add tests for Python 3.14 (Jendrik Seipp). Downward Lab diff --git a/lab/calls/call.py b/lab/calls/call.py index 52d52329d..29b84eef2 100644 --- a/lab/calls/call.py +++ b/lab/calls/call.py @@ -107,6 +107,7 @@ def __init__( args, name, time_limit=None, + wall_time_limit=None, memory_limit=None, soft_stdout_limit=None, hard_stdout_limit=None, @@ -124,23 +125,33 @@ def __init__( 2. Monitoring the cumulative CPU time of the process and all its child processes, terminating the process tree if the total exceeds the limit. + The *wall_time_limit* parameter enforces a wall-clock time limit. If not + set and *time_limit* is provided, it defaults to + max(30, time_limit * 1.5) seconds. If both *time_limit* and + *wall_time_limit* are None, no wall-clock time limit is enforced. + See also the documentation for ``lab.experiment._Buildable.add_command()``. """ assert "stdin" not in kwargs, "redirecting stdin is not supported" self.name = name - self.time_limit = time_limit + self.cpu_time_limit = time_limit self.cpu_time = None + self.wall_clock_start_time = None # Track CPU time per PID to handle sequential children. self.pid_cpu_times = {} # {pid: last_observed_cpu_time} self.finalized_cpu_time = 0.0 # Accumulated CPU time from terminated processes - if time_limit is None: - self.wall_clock_time_limit = None - else: - # Enforce minimum on wall-clock limit to account for disk latencies. + # Set wall-clock time limit + if wall_time_limit is not None: + self.wall_clock_time_limit = wall_time_limit + elif time_limit is not None: + # Default: Enforce minimum on wall-clock limit to account for + # disk latencies. self.wall_clock_time_limit = max(30, time_limit * 1.5) + else: + self.wall_clock_time_limit = None def get_bytes(limit): return None if limit is None else int(limit * 1024) @@ -225,23 +236,23 @@ def _update_cpu_time(self): except (OSError, AttributeError): return None - def _monitor_cpu_time(self): + def _monitor_time_limits(self): """ - Monitor the CPU time of the process and all its children. - Terminate the process if it exceeds the time limit. + Monitor the CPU time and wall-clock time of the process. + Terminate the process if it exceeds either limit. """ while self.process.poll() is None: + # Check CPU time limit total_cpu_time = self._update_cpu_time() if total_cpu_time is None: # Process may have terminated. break - # Check if CPU time limit is exceeded. - if self.time_limit is not None and total_cpu_time > self.time_limit: + if self.cpu_time_limit is not None and total_cpu_time > self.cpu_time_limit: logging.info( f"{self.name} exceeded CPU time limit: " - f"{total_cpu_time:.2f}s > {self.time_limit}s" + f"{total_cpu_time:.2f}s > {self.cpu_time_limit}s" ) self.process.terminate() # Give it a moment to terminate gracefully. @@ -250,6 +261,22 @@ def _monitor_cpu_time(self): self.process.kill() break + # Check wall-clock time limit + if self.wall_clock_time_limit is not None: + assert self.wall_clock_start_time is not None + wall_clock_time = time.monotonic() - self.wall_clock_start_time + if wall_clock_time > self.wall_clock_time_limit: + logging.info( + f"{self.name} exceeded wall-clock time limit: " + f"{wall_clock_time:.2f}s > {self.wall_clock_time_limit}s" + ) + self.process.terminate() + # Give it a moment to terminate gracefully. + time.sleep(1) + if self.process.poll() is None: + self.process.kill() + break + time.sleep(self.CPU_TIME_CHECK_INTERVAL) def cpu_time_limit_exceeded(self, use_slack=False): @@ -260,9 +287,9 @@ def cpu_time_limit_exceeded(self, use_slack=False): measurement granularity. """ - if self.time_limit is None or self.cpu_time is None: + if self.cpu_time_limit is None or self.cpu_time is None: return False - limit = self.time_limit + limit = self.cpu_time_limit if use_slack: limit += self.CPU_TIME_CHECK_INTERVAL return self.cpu_time > limit @@ -357,13 +384,13 @@ def close_unregister_and_remove(fd): ) def wait(self): - wall_clock_start_time = time.monotonic() + self.wall_clock_start_time = time.monotonic() - # Start CPU time monitoring thread if time limit is set. + # Start monitoring thread if any time limit is set. monitor_thread = None - if self.time_limit is not None: + if self.cpu_time_limit is not None or self.wall_clock_time_limit is not None: monitor_thread = threading.Thread( - target=self._monitor_cpu_time, daemon=True + target=self._monitor_time_limits, daemon=True ) monitor_thread.start() @@ -387,20 +414,12 @@ def wait(self): for file in self.opened_files: file.close() - wall_clock_time = time.monotonic() - wall_clock_start_time + wall_clock_time = time.monotonic() - self.wall_clock_start_time logging.info(f"{self.name} wall-clock time: {wall_clock_time:.2f}s") # Report CPU time including children. if self.cpu_time is not None: logging.info(f"{self.name} CPU time: {self.cpu_time:.2f}s") - if ( - self.wall_clock_time_limit is not None - and wall_clock_time > self.wall_clock_time_limit - ): - logging.error( - f"wall-clock time for {self.name} too high: " - f"{wall_clock_time:.2f}s > {self.time_limit}s" - ) logging.info(f"{self.name} exit code: {retcode}") return retcode diff --git a/lab/experiment.py b/lab/experiment.py index 12afdc7f2..af27ec78a 100644 --- a/lab/experiment.py +++ b/lab/experiment.py @@ -179,6 +179,7 @@ def add_command( name, command, time_limit=None, + wall_time_limit=None, memory_limit=None, soft_stdout_limit=1024, hard_stdout_limit=10 * 1024, @@ -206,6 +207,11 @@ def add_command( time spent across all threads of the command is the sum of time spent across all threads of the command and its descendants. + The *wall_time_limit* parameter specifies the wall-clock time limit in + seconds. If not set and *time_limit* is provided, it defaults to + max(30, time_limit * 1.5) seconds. If both *time_limit* and + *wall_time_limit* are None, no wall-clock time limit is enforced. + The command is aborted with SIGKILL when any of its threads uses more than *memory_limit* MiB. @@ -270,6 +276,7 @@ def add_command( if "stdin" in kwargs: logging.critical("redirecting stdin is not supported") kwargs["time_limit"] = time_limit + kwargs["wall_time_limit"] = wall_time_limit kwargs["memory_limit"] = memory_limit kwargs["soft_stdout_limit"] = soft_stdout_limit kwargs["hard_stdout_limit"] = hard_stdout_limit diff --git a/tests/test_cpu_time_limit.py b/tests/test_time_limits.py similarity index 77% rename from tests/test_cpu_time_limit.py rename to tests/test_time_limits.py index e7d7bdecd..cd827beae 100644 --- a/tests/test_cpu_time_limit.py +++ b/tests/test_time_limits.py @@ -365,3 +365,111 @@ def test_sequential_children_cpu_time_accumulated(temp_script): f"Should accumulate sequential children's CPU time (~1s + ~0.5s = ~1.5s), " f"but got {call.cpu_time:.2f}s." ) + + +def test_wall_time_default_calculation(temp_script): + """Test that wall_time defaults to max(30, time_limit * 1.5) when not specified.""" + # When time_limit = 10, wall_time should be max(30, 10 * 1.5) = 30 + call = Call( + [sys.executable, "-c", "print('test')"], + name="wall-time-default-30", + time_limit=10, + ) + call.wait() + assert call.wall_clock_time_limit == 30 + + # When time_limit = 100, wall_time should be max(30, 100 * 1.5) = 150 + call = Call( + [sys.executable, "-c", "print('test')"], + name="wall-time-default-150", + time_limit=100, + ) + call.wait() + assert call.wall_clock_time_limit == 150 + + # When time_limit is None, wall_time should be None + call = Call( + [sys.executable, "-c", "print('test')"], + name="wall-time-none", + time_limit=None, + ) + call.wait() + assert call.wall_clock_time_limit is None + + +def test_wall_time_explicit_parameter(temp_script): + """Test that explicit wall_time_limit parameter overrides the default.""" + # Set explicit wall_time_limit = 60, even though time_limit would suggest 30 + call = Call( + [sys.executable, "-c", "print('test')"], + name="wall-time-explicit", + time_limit=10, + wall_time_limit=60, + ) + call.wait() + assert call.wall_clock_time_limit == 60 + + # Set wall_time_limit without time_limit + call = Call( + [sys.executable, "-c", "print('test')"], + name="wall-time-only", + time_limit=None, + wall_time_limit=45, + ) + call.wait() + assert call.wall_clock_time_limit == 45 + + +def test_wall_time_logging(temp_script, caplog): + """Test that wall-clock time is properly logged after command execution.""" + import logging + + script = temp_script(""" +import time +time.sleep(0.5) +print('Done') +""") + + with caplog.at_level(logging.INFO): + call = Call( + [sys.executable, script], + name="wall-time-logging", + time_limit=10, + ) + call.wait() + + # Check that wall-clock time is logged + assert any( + "wall-time-logging wall-clock time:" in record.message + for record in caplog.records + ) + + +def test_wall_time_limit_exceeded(temp_script, caplog): + """Test that exceeding wall-clock time limit terminates the process.""" + import logging + + script = temp_script(""" +import time +# Sleep longer than the wall-clock time limit +time.sleep(5) +print('Done') +""") + + with caplog.at_level(logging.INFO): + call = Call( + [sys.executable, script], + name="wall-time-exceeded", + time_limit=None, + wall_time_limit=1, + ) + retcode = call.wait() + + # Check that wall-clock time limit exceeded is logged + assert any( + "wall-time-exceeded exceeded wall-clock time limit:" in record.message + for record in caplog.records + ), "Should log info when wall-clock time limit is exceeded" + + # Process should have been terminated (negative return code or specific exit code) + assert retcode != 0, "Process should have been terminated"