Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/news.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ next (unreleased)

Lab
^^^
* Add ``wall_time_limit`` parameter to ``add_command()`` to allow explicit wall-clock time limits (Jendrik Seipp).
* Remove tests for Python 3.8 and 3.9. Add tests for Python 3.14 (Jendrik Seipp).

Downward Lab
Expand Down
71 changes: 45 additions & 26 deletions lab/calls/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(
args,
name,
time_limit=None,
wall_time_limit=None,
memory_limit=None,
soft_stdout_limit=None,
hard_stdout_limit=None,
Expand All @@ -124,23 +125,33 @@ def __init__(
2. Monitoring the cumulative CPU time of the process and all its child
processes, terminating the process tree if the total exceeds the limit.

The *wall_time_limit* parameter enforces a wall-clock time limit. If not
set and *time_limit* is provided, it defaults to
max(30, time_limit * 1.5) seconds. If both *time_limit* and
*wall_time_limit* are None, no wall-clock time limit is enforced.

See also the documentation for
``lab.experiment._Buildable.add_command()``.

"""
assert "stdin" not in kwargs, "redirecting stdin is not supported"
self.name = name
self.time_limit = time_limit
self.cpu_time_limit = time_limit
self.cpu_time = None
self.wall_clock_start_time = None
# Track CPU time per PID to handle sequential children.
self.pid_cpu_times = {} # {pid: last_observed_cpu_time}
self.finalized_cpu_time = 0.0 # Accumulated CPU time from terminated processes

if time_limit is None:
self.wall_clock_time_limit = None
else:
# Enforce minimum on wall-clock limit to account for disk latencies.
# Set wall-clock time limit
if wall_time_limit is not None:
self.wall_clock_time_limit = wall_time_limit
elif time_limit is not None:
# Default: Enforce minimum on wall-clock limit to account for
# disk latencies.
self.wall_clock_time_limit = max(30, time_limit * 1.5)
else:
self.wall_clock_time_limit = None

def get_bytes(limit):
return None if limit is None else int(limit * 1024)
Expand Down Expand Up @@ -225,23 +236,23 @@ def _update_cpu_time(self):
except (OSError, AttributeError):
return None

def _monitor_cpu_time(self):
def _monitor_time_limits(self):
"""
Monitor the CPU time of the process and all its children.
Terminate the process if it exceeds the time limit.
Monitor the CPU time and wall-clock time of the process.
Terminate the process if it exceeds either limit.
"""
while self.process.poll() is None:
# Check CPU time limit
total_cpu_time = self._update_cpu_time()

if total_cpu_time is None:
# Process may have terminated.
break

# Check if CPU time limit is exceeded.
if self.time_limit is not None and total_cpu_time > self.time_limit:
if self.cpu_time_limit is not None and total_cpu_time > self.cpu_time_limit:
logging.info(
f"{self.name} exceeded CPU time limit: "
f"{total_cpu_time:.2f}s > {self.time_limit}s"
f"{total_cpu_time:.2f}s > {self.cpu_time_limit}s"
)
self.process.terminate()
# Give it a moment to terminate gracefully.
Expand All @@ -250,6 +261,22 @@ def _monitor_cpu_time(self):
self.process.kill()
break

# Check wall-clock time limit
if self.wall_clock_time_limit is not None:
assert self.wall_clock_start_time is not None
wall_clock_time = time.monotonic() - self.wall_clock_start_time
if wall_clock_time > self.wall_clock_time_limit:
logging.info(
f"{self.name} exceeded wall-clock time limit: "
f"{wall_clock_time:.2f}s > {self.wall_clock_time_limit}s"
)
self.process.terminate()
# Give it a moment to terminate gracefully.
time.sleep(1)
if self.process.poll() is None:
self.process.kill()
break

time.sleep(self.CPU_TIME_CHECK_INTERVAL)

def cpu_time_limit_exceeded(self, use_slack=False):
Expand All @@ -260,9 +287,9 @@ def cpu_time_limit_exceeded(self, use_slack=False):
measurement granularity.

"""
if self.time_limit is None or self.cpu_time is None:
if self.cpu_time_limit is None or self.cpu_time is None:
return False
limit = self.time_limit
limit = self.cpu_time_limit
if use_slack:
limit += self.CPU_TIME_CHECK_INTERVAL
return self.cpu_time > limit
Expand Down Expand Up @@ -357,13 +384,13 @@ def close_unregister_and_remove(fd):
)

def wait(self):
wall_clock_start_time = time.monotonic()
self.wall_clock_start_time = time.monotonic()

# Start CPU time monitoring thread if time limit is set.
# Start monitoring thread if any time limit is set.
monitor_thread = None
if self.time_limit is not None:
if self.cpu_time_limit is not None or self.wall_clock_time_limit is not None:
monitor_thread = threading.Thread(
target=self._monitor_cpu_time, daemon=True
target=self._monitor_time_limits, daemon=True
)
monitor_thread.start()

Expand All @@ -387,20 +414,12 @@ def wait(self):
for file in self.opened_files:
file.close()

wall_clock_time = time.monotonic() - wall_clock_start_time
wall_clock_time = time.monotonic() - self.wall_clock_start_time
logging.info(f"{self.name} wall-clock time: {wall_clock_time:.2f}s")

# Report CPU time including children.
if self.cpu_time is not None:
logging.info(f"{self.name} CPU time: {self.cpu_time:.2f}s")

if (
self.wall_clock_time_limit is not None
and wall_clock_time > self.wall_clock_time_limit
):
logging.error(
f"wall-clock time for {self.name} too high: "
f"{wall_clock_time:.2f}s > {self.time_limit}s"
)
logging.info(f"{self.name} exit code: {retcode}")
return retcode
7 changes: 7 additions & 0 deletions lab/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def add_command(
name,
command,
time_limit=None,
wall_time_limit=None,
memory_limit=None,
soft_stdout_limit=1024,
hard_stdout_limit=10 * 1024,
Expand Down Expand Up @@ -206,6 +207,11 @@ def add_command(
time spent across all threads of the command is the sum of time
spent across all threads of the command and its descendants.

The *wall_time_limit* parameter specifies the wall-clock time limit in
seconds. If not set and *time_limit* is provided, it defaults to
max(30, time_limit * 1.5) seconds. If both *time_limit* and
*wall_time_limit* are None, no wall-clock time limit is enforced.

The command is aborted with SIGKILL when any of its threads
uses more than *memory_limit* MiB.

Expand Down Expand Up @@ -270,6 +276,7 @@ def add_command(
if "stdin" in kwargs:
logging.critical("redirecting stdin is not supported")
kwargs["time_limit"] = time_limit
kwargs["wall_time_limit"] = wall_time_limit
kwargs["memory_limit"] = memory_limit
kwargs["soft_stdout_limit"] = soft_stdout_limit
kwargs["hard_stdout_limit"] = hard_stdout_limit
Expand Down
108 changes: 108 additions & 0 deletions tests/test_cpu_time_limit.py → tests/test_time_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,111 @@ def test_sequential_children_cpu_time_accumulated(temp_script):
f"Should accumulate sequential children's CPU time (~1s + ~0.5s = ~1.5s), "
f"but got {call.cpu_time:.2f}s."
)


def test_wall_time_default_calculation(temp_script):
"""Test that wall_time defaults to max(30, time_limit * 1.5) when not specified."""
# When time_limit = 10, wall_time should be max(30, 10 * 1.5) = 30
call = Call(
[sys.executable, "-c", "print('test')"],
name="wall-time-default-30",
time_limit=10,
)
call.wait()
assert call.wall_clock_time_limit == 30

# When time_limit = 100, wall_time should be max(30, 100 * 1.5) = 150
call = Call(
[sys.executable, "-c", "print('test')"],
name="wall-time-default-150",
time_limit=100,
)
call.wait()
assert call.wall_clock_time_limit == 150

# When time_limit is None, wall_time should be None
call = Call(
[sys.executable, "-c", "print('test')"],
name="wall-time-none",
time_limit=None,
)
call.wait()
assert call.wall_clock_time_limit is None


def test_wall_time_explicit_parameter(temp_script):
"""Test that explicit wall_time_limit parameter overrides the default."""
# Set explicit wall_time_limit = 60, even though time_limit would suggest 30
call = Call(
[sys.executable, "-c", "print('test')"],
name="wall-time-explicit",
time_limit=10,
wall_time_limit=60,
)
call.wait()
assert call.wall_clock_time_limit == 60

# Set wall_time_limit without time_limit
call = Call(
[sys.executable, "-c", "print('test')"],
name="wall-time-only",
time_limit=None,
wall_time_limit=45,
)
call.wait()
assert call.wall_clock_time_limit == 45


def test_wall_time_logging(temp_script, caplog):
"""Test that wall-clock time is properly logged after command execution."""
import logging

script = temp_script("""
import time
time.sleep(0.5)
print('Done')
""")

with caplog.at_level(logging.INFO):
call = Call(
[sys.executable, script],
name="wall-time-logging",
time_limit=10,
)
call.wait()

# Check that wall-clock time is logged
assert any(
"wall-time-logging wall-clock time:" in record.message
for record in caplog.records
)


def test_wall_time_limit_exceeded(temp_script, caplog):
"""Test that exceeding wall-clock time limit terminates the process."""
import logging

script = temp_script("""
import time
# Sleep longer than the wall-clock time limit
time.sleep(5)
print('Done')
""")

with caplog.at_level(logging.INFO):
call = Call(
[sys.executable, script],
name="wall-time-exceeded",
time_limit=None,
wall_time_limit=1,
)
retcode = call.wait()

# Check that wall-clock time limit exceeded is logged
assert any(
"wall-time-exceeded exceeded wall-clock time limit:" in record.message
for record in caplog.records
), "Should log info when wall-clock time limit is exceeded"

# Process should have been terminated (negative return code or specific exit code)
assert retcode != 0, "Process should have been terminated"