Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This document provides guidance for AI agents and contributors working on the `p
- **Package name**: `py-app-dev`
- **Python version**: 3.10+
- **Package manager**: [uv](https://docs.astral.sh/uv/)
- **Build backend**: Poetry
- **Build backend**: uv
- **CI/CD**: GitHub Actions with semantic-release

## Repository Structure
Expand Down
1,827 changes: 0 additions & 1,827 deletions poetry.lock

This file was deleted.

2 changes: 0 additions & 2 deletions poetry.toml

This file was deleted.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ classifiers = [
dependencies = [
"mashumaro<4.0,>=3.5",
"loguru<1.0,>=0.7",
"psutil>=7.1,<8.0",
]
urls."Bug Tracker" = "https://github.com/cuinixam/py-app-dev/issues"
urls.Changelog = "https://github.com/cuinixam/py-app-dev/blob/main/CHANGELOG.md"
Expand Down Expand Up @@ -95,6 +96,9 @@ addopts = """\
--cov-report=xml
"""
pythonpath = [ "src" ]
markers = [
"exploratory: local-only exploratory tests; skipped by default (run with --run-exploratory)",
]

[tool.coverage.run]
branch = true
Expand Down
134 changes: 110 additions & 24 deletions src/py_app_dev/core/subprocess.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import contextlib
import locale
import queue
import shutil
import subprocess # nosec
import threading
import time
from pathlib import Path
from typing import Any

import psutil

from .exceptions import UserNotificationException
from .logging import logger

Expand All @@ -23,6 +29,7 @@ class SubprocessExecutor:
capture_output: If True, the output of the command will be captured.
print_output: If True, the output of the command will be printed to the logger.
One can set this to false in order to get the output in the returned CompletedProcess object.
timeout: Maximum runtime in seconds before the subprocess is forcefully terminated. If None, there is no timeout.

"""

Expand All @@ -34,6 +41,7 @@ def __init__(
env: dict[str, str] | None = None,
shell: bool = False,
print_output: bool = True,
timeout: int | None = None,
):
self.logger = logger.bind()
self.command = command
Expand All @@ -42,58 +50,136 @@ def __init__(
self.env = env
self.shell = shell
self.print_output = print_output
self.timeout = timeout

@property
def command_str(self) -> str:
if isinstance(self.command, str):
return self.command
return " ".join(str(arg) if not isinstance(arg, str) else arg for arg in self.command)

def _finalize_process(
self,
process: subprocess.Popen[Any] | None,
) -> None:
"""
Clean up a subprocess and its children.

If the process is still running it is forcibly killed (children first,
then parent). This is a no-op when the process has already finished.
"""
if process is None:
return
wait_timeout_seconds: float = 5.0
try:
if process.poll() is None:
try:
parent = psutil.Process(process.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied):
parent = None

if parent:
for child in parent.children(recursive=True):
with contextlib.suppress(Exception):
child.kill()
with contextlib.suppress(Exception):
parent.kill()
except Exception as exc:
self.logger.error(f"Failed to kill process tree for PID {getattr(process, 'pid', 'unknown')}: {exc}")

with contextlib.suppress(BaseException):
process.wait(timeout=wait_timeout_seconds)

for pipe in (process.stdin, process.stdout, process.stderr):
if pipe:
with contextlib.suppress(Exception):
pipe.close()

def execute(self, handle_errors: bool = True) -> subprocess.CompletedProcess[Any] | None:
"""Execute the command and return the CompletedProcess object if handle_errors is False."""
start_time = time.monotonic()
completed_process: subprocess.CompletedProcess[Any] | None = None
stdout = ""
stderr = ""
self.logger.info(f"Running command: {self.command_str}")
cwd_path = (self.current_working_directory or Path.cwd()).as_posix()
process: subprocess.Popen[str] | None = None
streaming_active = False

try:
completed_process = None
stdout = ""
stderr = ""
self.logger.info(f"Running command: {self.command_str}")
cwd_path = (self.current_working_directory or Path.cwd()).as_posix()
with subprocess.Popen(
process = subprocess.Popen(
args=self.command,
cwd=cwd_path,
# Combine both streams to stdout (when captured)
stdout=(subprocess.PIPE if self.capture_output else subprocess.DEVNULL),
stderr=(subprocess.STDOUT if self.capture_output else subprocess.DEVNULL),
# enables line buffering, line is flushed after each \n
bufsize=1,
bufsize=1, # line buffering, flushed after each \n
text=True,
# every new line is a \n
universal_newlines=True,
# decode bytes to str using current locale/system encoding
universal_newlines=True, # normalize line endings to \n
encoding=locale.getpreferredencoding(False),
# replace unknown characters with �
errors="replace",
errors="replace", # replace undecodable bytes with �
env=self.env,
shell=self.shell,
) as process: # nosec
if self.capture_output and process.stdout is not None:
if self.print_output:
for line in iter(process.stdout.readline, ""):
self.logger.info(line.strip())
stdout += line
process.wait()
else:
stdout, stderr = process.communicate()
)
# STREAMING OUTPUT MODE
if self.capture_output and self.print_output and process.stdout is not None:
# Drain stdout on a background thread so the main loop is never
# blocked by readline() when the process is silent. A sentinel
# None is pushed after EOF so the consumer knows when to stop.
streaming_active = True
line_queue: queue.Queue[str | None] = queue.Queue()

def _reader(src: Any, dest: queue.Queue[str | None]) -> None:
for ln in src:
dest.put(ln)
dest.put(None)

reader_thread = threading.Thread(target=_reader, args=(process.stdout, line_queue), daemon=True)
reader_thread.start()

while True:
# Timeout check runs even when the process writes nothing
if self.timeout is not None and (time.monotonic() - start_time) > self.timeout:
raise subprocess.TimeoutExpired(
cmd=self.command_str,
timeout=self.timeout,
output=stdout,
)
try:
line = line_queue.get(timeout=0.05)
except queue.Empty:
continue
if line is None: # EOF sentinel
break
self.logger.info(line.strip())
stdout += line

reader_thread.join()
process.wait()
# NON-STREAMING MODE
elif self.capture_output and not self.print_output:
stdout, stderr = process.communicate(timeout=self.timeout)
else:
# No output capturing; just wait for completion
process.wait(timeout=self.timeout)

if handle_errors:
# Check return code
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, self.command_str)
raise subprocess.CalledProcessError(process.returncode, self.command_str, stdout, stderr)
else:
completed_process = subprocess.CompletedProcess(process.args, process.returncode, stdout, stderr)
except subprocess.TimeoutExpired:
raise UserNotificationException(f"Command '{self.command_str}' timed out after {self.timeout} seconds and was forcefully terminated.") from None
except subprocess.CalledProcessError as e:
raise UserNotificationException(f"Command '{self.command_str}' execution failed with return code {e.returncode}") from None
except FileNotFoundError as e:
raise UserNotificationException(f"Command '{self.command_str}' could not be executed. Failed with error {e}") from None
except KeyboardInterrupt:
raise UserNotificationException(f"Command '{self.command_str}' execution interrupted by user") from None
finally:
# Kill the process first so pipe EOF unblocks the reader thread
self._finalize_process(process=process)
if streaming_active:
with contextlib.suppress(Exception):
reader_thread.join(timeout=2.0)
return completed_process
Loading