From 65fd5c32581a6de958710907d90e88c5c80b8e92 Mon Sep 17 00:00:00 2001 From: actflower Date: Wed, 14 Jan 2026 16:52:42 +0000 Subject: [PATCH 1/2] Add max_running_containers limit for resource management When multiple repositories are in use, each can have containers running across different dev names. This change adds a configurable limit on the maximum number of containers that can be running simultaneously. When the limit is reached and a new task needs to run, the least recently used idle container is automatically stopped to make room. Containers that are actively processing tasks are never evicted. Configuration: - MAX_RUNNING_CONTAINERS: Set to limit (default: 0 = no limit) Closes #79 --- CLAUDE.md | 8 +- packages/webhook/devs_webhook/config.py | 6 + .../devs_webhook/core/container_pool.py | 126 ++++++++++++++++-- 3 files changed, 128 insertions(+), 12 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index e06e1e6..459bcf5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -328,6 +328,7 @@ devs start eamonn --env DEBUG=false --env NEW_VAR=test - `CONTAINER_MAX_AGE_HOURS`: Maximum container age in hours - containers older than this are cleaned up when idle (default: 10) - `CLEANUP_CHECK_INTERVAL_SECONDS`: How often to check for idle/old containers (default: 60) - `MAX_CONCURRENT_TASKS`: Maximum parallel tasks (default: 3) +- `MAX_RUNNING_CONTAINERS`: Maximum number of containers that can be running at once across all repos (default: 0 = no limit). When this limit is reached, the least recently used idle container is stopped to make room for new tasks. This helps limit resource usage when working with multiple repositories. **Access Control**: - `ALLOWED_ORGS`: Comma-separated GitHub organizations @@ -487,12 +488,13 @@ devs-webhook-worker --container-name eamonn --task-json-stdin < task.json ### Container Lifecycle Management -Containers are automatically managed with cleanup based on idle time and age: +Containers are automatically managed with cleanup based on idle time, age, and resource limits: 1. **Idle Cleanup**: Containers idle for longer than `CONTAINER_TIMEOUT_MINUTES` (default: 60 min) are stopped and cleaned up 2. **Age-Based Cleanup**: Containers older than `CONTAINER_MAX_AGE_HOURS` (default: 10 hours) are cleaned up when they become idle -3. **Graceful Shutdown**: On server shutdown (SIGTERM/SIGINT), all running containers are cleaned up -4. **Manual Stop**: Admin can force-stop containers via `POST /container/{name}/stop` endpoint +3. **Running Container Limit**: When `MAX_RUNNING_CONTAINERS` is set (default: 0 = no limit), the system automatically stops the least recently used idle container when the limit is reached and a new task needs to run. This prevents resource exhaustion when multiple repositories are in use. +4. **Graceful Shutdown**: On server shutdown (SIGTERM/SIGINT), all running containers are cleaned up +5. **Manual Stop**: Admin can force-stop containers via `POST /container/{name}/stop` endpoint **Key behaviors**: - Containers currently processing tasks are never interrupted by age-based cleanup diff --git a/packages/webhook/devs_webhook/config.py b/packages/webhook/devs_webhook/config.py index 859a94c..74d553b 100644 --- a/packages/webhook/devs_webhook/config.py +++ b/packages/webhook/devs_webhook/config.py @@ -80,6 +80,12 @@ def __init__(self, **kwargs): default=10, description="Maximum container age in hours (containers older than this are cleaned up when idle)" ) + max_running_containers: int = Field( + default=0, + description="Maximum number of containers that can be running at once across all repos. " + "When limit is reached, least recently used idle containers are stopped. " + "Set to 0 for no limit (default). Must not exceed the container pool size." + ) cleanup_check_interval_seconds: int = Field( default=60, description="How often to check for idle/old containers (in seconds)" diff --git a/packages/webhook/devs_webhook/core/container_pool.py b/packages/webhook/devs_webhook/core/container_pool.py index 254d1b6..b2b1579 100644 --- a/packages/webhook/devs_webhook/core/container_pool.py +++ b/packages/webhook/devs_webhook/core/container_pool.py @@ -49,6 +49,8 @@ def __init__(self, enable_cleanup_worker: bool = True): # Track running containers for idle cleanup self.running_containers: Dict[str, Dict[str, Any]] = {} + # Track which containers are actively processing a task (vs just idle/started) + self.actively_processing: set[str] = set() self._lock = asyncio.Lock() # Get container pools (main pool for AI tasks, optionally separate CI pool) @@ -573,6 +575,18 @@ async def _process_task_subprocess(self, dev_name: str, queued_task: QueuedTask) repo_name=repo_name, repo_path=str(repo_path)) + # Check if we need to evict an LRU container to stay under the limit + can_proceed = await self._evict_lru_container_if_needed(dev_name) + if not can_proceed: + # All containers are actively processing - this shouldn't happen often + # since the queue should prevent tasks from running when containers are busy + logger.warning("All containers actively processing, task will wait", + task_id=queued_task.task_id, + container=dev_name) + # Re-queue the task to retry later + await self.container_queues[dev_name].put(queued_task) + return + # Track container as running now = datetime.now(tz=timezone.utc) async with self._lock: @@ -588,6 +602,9 @@ async def _process_task_subprocess(self, dev_name: str, queued_task: QueuedTask) self.running_containers[dev_name]["last_used"] = now self.running_containers[dev_name]["repo_path"] = repo_path + # Mark as actively processing + self.actively_processing.add(dev_name) + try: # Get cached config or ensure it's loaded devs_options = self.get_repo_config(repo_name) @@ -837,10 +854,12 @@ async def _process_task_subprocess(self, dev_name: str, queued_task: QueuedTask) # Task execution failed, but we've logged it - don't re-raise finally: - # Update last_used timestamp after task completes (success or failure) + # Update last_used timestamp and clear actively_processing status async with self._lock: if dev_name in self.running_containers: self.running_containers[dev_name]["last_used"] = datetime.now(tz=timezone.utc) + # Mark as no longer actively processing + self.actively_processing.discard(dev_name) async def _checkout_default_branch(self, repo_name: str, repo_path: Path) -> None: """Checkout the default branch to ensure devcontainer files are from the right branch. @@ -1207,10 +1226,12 @@ async def get_status(self) -> Dict[str, Any]: }, "single_queue_assignments": self.single_queue_assignments.copy(), "cached_repo_configs": list(self.repo_configs.keys()), + "actively_processing": list(self.actively_processing), "cleanup_settings": { "idle_timeout_minutes": self.config.container_timeout_minutes, "max_age_hours": self.config.container_max_age_hours, "check_interval_seconds": self.config.cleanup_check_interval_seconds, + "max_running_containers": self.config.max_running_containers, }, } @@ -1369,7 +1390,7 @@ async def _idle_cleanup_worker(self) -> None: async def _cleanup_container(self, dev_name: str, repo_path: Path) -> None: """Clean up a container after use. - + Args: dev_name: Name of container to clean up repo_path: Path to repository on host @@ -1377,27 +1398,114 @@ async def _cleanup_container(self, dev_name: str, repo_path: Path) -> None: try: # Create project and managers for cleanup project = Project(repo_path) - + # Use the same config as the rest of the webhook handler workspace_manager = WorkspaceManager(project, self.config) container_manager = ContainerManager(project, self.config) - + # Stop container logger.info("Starting container stop", container=dev_name) stop_success = container_manager.stop_container(dev_name) logger.info("Container stop result", container=dev_name, success=stop_success) - + # Remove workspace logger.info("Starting workspace removal", container=dev_name) workspace_success = workspace_manager.remove_workspace(dev_name) logger.info("Workspace removal result", container=dev_name, success=workspace_success) - - logger.info("Container cleanup complete", + + logger.info("Container cleanup complete", container=dev_name, container_stopped=stop_success, workspace_removed=workspace_success) - + except Exception as e: logger.error("Container cleanup failed", container=dev_name, - error=str(e)) \ No newline at end of file + error=str(e)) + + async def _evict_lru_container_if_needed(self, requesting_container: str) -> bool: + """Evict the least recently used idle container if we're at the limit. + + This method enforces the max_running_containers limit by stopping idle + containers when needed to make room for new tasks. + + Args: + requesting_container: The container name that needs to start processing + + Returns: + True if we can proceed (either under limit, no limit set, or eviction succeeded), + False if we couldn't make room (all containers actively processing) + """ + max_running = self.config.max_running_containers + + # If no limit is set (0), always allow + if max_running <= 0: + return True + + async with self._lock: + # Count currently running containers (excluding the requesting one if not yet started) + current_running = len(self.running_containers) + + # If requesting container is already running, don't double-count + is_already_running = requesting_container in self.running_containers + + # Calculate how many would be running if we start this container + would_be_running = current_running if is_already_running else current_running + 1 + + # If we're under the limit, we're fine + if would_be_running <= max_running: + logger.debug("Under container limit", + current_running=current_running, + max_running=max_running, + requesting_container=requesting_container) + return True + + # Need to evict - find LRU idle container + # "Idle" means not in actively_processing set + lru_candidate = None + lru_last_used = None + + for dev_name, info in self.running_containers.items(): + # Skip the requesting container + if dev_name == requesting_container: + continue + + # Skip containers that are actively processing a task + if dev_name in self.actively_processing: + continue + + # Find the one with oldest last_used timestamp + if lru_candidate is None or info["last_used"] < lru_last_used: + lru_candidate = dev_name + lru_last_used = info["last_used"] + + if lru_candidate is None: + # All other containers are actively processing + logger.warning("Cannot evict any containers - all are actively processing", + current_running=current_running, + max_running=max_running, + actively_processing=list(self.actively_processing)) + return False + + # Evict the LRU container + logger.info("Evicting LRU container to make room", + evicting=lru_candidate, + last_used=lru_last_used.isoformat() if lru_last_used else "unknown", + requesting_container=requesting_container, + current_running=current_running, + max_running=max_running) + + repo_path = self.running_containers[lru_candidate]["repo_path"] + + # Cleanup must be done outside the lock to avoid deadlock + # Store info and remove from tracking first + del self.running_containers[lru_candidate] + + # Do the actual cleanup outside the lock + await self._cleanup_container(lru_candidate, repo_path) + + logger.info("LRU container evicted successfully", + evicted=lru_candidate, + requesting_container=requesting_container) + + return True \ No newline at end of file From 4405472598d485941648123988cc77ed34ed4a1d Mon Sep 17 00:00:00 2001 From: actflower Date: Thu, 15 Jan 2026 11:51:34 +0000 Subject: [PATCH 2/2] Fix mock config to include max_running_containers Add max_running_containers = 0 to the mock_config fixture to prevent TypeError when the eviction logic checks this config value during tests. --- packages/webhook/tests/test_single_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/webhook/tests/test_single_queue.py b/packages/webhook/tests/test_single_queue.py index 33b1b02..8fb183b 100644 --- a/packages/webhook/tests/test_single_queue.py +++ b/packages/webhook/tests/test_single_queue.py @@ -31,6 +31,7 @@ def mock_config(): config.get_container_pool_list.return_value = ["eamonn", "harry", "darren"] config.github_token = "test-token-1234567890" # Non-empty token config.container_timeout_minutes = 60 + config.max_running_containers = 0 # No limit by default # Create a real temp directory for repo_cache_dir temp_dir = tempfile.mkdtemp() config.repo_cache_dir = Path(temp_dir)