diff --git a/bin/ask b/bin/ask index 21df402..228f76c 100755 --- a/bin/ask +++ b/bin/ask @@ -22,11 +22,9 @@ Examples: from __future__ import annotations import os -import shutil import subprocess import sys import tempfile -import time from datetime import datetime from pathlib import Path @@ -100,6 +98,64 @@ def _use_unified_daemon() -> bool: return True # Default to unified daemon +def _maybe_start_unified_daemon() -> bool: + """Try to start unified askd daemon if not running.""" + import shutil + import time + import sys + from askd_runtime import state_file_path + from askd.daemon import ping_daemon + + # Check if already running + state_file = state_file_path("askd.json") + if ping_daemon(timeout_s=0.5, state_file=state_file): + return True + + # Find askd binary + candidates: list[str] = [] + local = (Path(__file__).resolve().parent / "askd") + if local.exists(): + candidates.append(str(local)) + found = shutil.which("askd") + if found: + candidates.append(found) + if not candidates: + return False + + # Prepare command with cross-platform handling + entry = candidates[0] + lower = entry.lower() + if lower.endswith((".cmd", ".bat", ".exe")): + argv = [entry] + else: + argv = [sys.executable, entry] + + # Start daemon in background with platform-specific flags + try: + kwargs = { + "stdin": subprocess.DEVNULL, + "stdout": subprocess.DEVNULL, + "stderr": subprocess.DEVNULL, + "close_fds": True, + } + if os.name == "nt": + kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) + else: + kwargs["start_new_session"] = True + subprocess.Popen(argv, **kwargs) + except Exception: + return False + + # Wait for daemon to be ready + deadline = time.time() + 2.0 + while time.time() < deadline: + if ping_daemon(timeout_s=0.2, state_file=state_file): + return True + time.sleep(0.1) + + return False + + def _send_via_unified_daemon( provider: str, message: str, @@ -107,27 +163,25 @@ def _send_via_unified_daemon( no_wrap: bool, caller: str, ) -> int: - """Send request via unified askd daemon.""" + """Send request via unified askd daemon with auto-start retry.""" import json import socket from askd_runtime import state_file_path import askd_rpc - ready_timeout = min(timeout, 2.0) if timeout and timeout > 0 else 2.0 - if not _ensure_unified_daemon_ready(timeout_s=ready_timeout): - print("[ERROR] Unified askd daemon not running", file=sys.stderr) - print("Start it with `askd` (or enable autostart via CCB_ASKD_AUTOSTART=1).", file=sys.stderr) - return EXIT_ERROR - # Use CCB_RUN_DIR (set by CCB startup) to locate the state file. # This already contains the correct project-specific path. state_file = state_file_path("askd.json") state = askd_rpc.read_state(state_file) if not state: - print("[ERROR] Unified askd daemon not running", file=sys.stderr) - return EXIT_ERROR + # Try to start daemon and retry once + if _maybe_start_unified_daemon(): + state = askd_rpc.read_state(state_file) + if not state: + print("[ERROR] Unified askd daemon not running", file=sys.stderr) + return EXIT_ERROR host = state.get("connect_host") or state.get("host") or "127.0.0.1" port = int(state.get("port") or 0) @@ -164,31 +218,74 @@ def _send_via_unified_daemon( req["email_msg_id"] = os.environ.get("CCB_EMAIL_MSG_ID", "") req["email_from"] = os.environ.get("CCB_EMAIL_FROM", "") - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(timeout + 10 if timeout > 0 else 3610) - sock.connect((host, port)) - sock.sendall((json.dumps(req) + "\n").encode("utf-8")) - - data = b"" - while True: - chunk = sock.recv(4096) - if not chunk: - break - data += chunk - if b"\n" in data: - break - - sock.close() - resp = json.loads(data.decode("utf-8").strip()) - exit_code = int(resp.get("exit_code") or 0) - reply = resp.get("reply") or "" - if reply: - print(reply) - return exit_code - except Exception as e: - print(f"[ERROR] {e}", file=sys.stderr) - return EXIT_ERROR + # Try to send request, with one retry on connection failure only + request_sent = False + for attempt in range(2): + sock = None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout + 10 if timeout > 0 else 3610) + sock.connect((host, port)) + + # Mark that connection succeeded - no retry after this point + request_sent = True + + sock.sendall((json.dumps(req) + "\n").encode("utf-8")) + + data = b"" + while True: + chunk = sock.recv(4096) + if not chunk: + break + data += chunk + if b"\n" in data: + break + + resp = json.loads(data.decode("utf-8").strip()) + exit_code = int(resp.get("exit_code") or 0) + reply = resp.get("reply") or "" + if reply: + print(reply) + return exit_code + except (ConnectionRefusedError, ConnectionResetError) as e: + # Only retry if connection failed before request was sent + if attempt == 0 and not request_sent: + if _maybe_start_unified_daemon(): + # Re-read state for new daemon + state = askd_rpc.read_state(state_file) + if state: + host = state.get("connect_host") or state.get("host") or "127.0.0.1" + port = int(state.get("port") or 0) + token = state.get("token") or "" + req["token"] = token + continue # Retry with new connection info + print(f"[ERROR] {e}", file=sys.stderr) + return EXIT_ERROR + except OSError as e: + # For other OS errors, only retry if connection not yet established + if attempt == 0 and not request_sent: + if _maybe_start_unified_daemon(): + state = askd_rpc.read_state(state_file) + if state: + host = state.get("connect_host") or state.get("host") or "127.0.0.1" + port = int(state.get("port") or 0) + token = state.get("token") or "" + req["token"] = token + continue + print(f"[ERROR] {e}", file=sys.stderr) + return EXIT_ERROR + except Exception as e: + print(f"[ERROR] {e}", file=sys.stderr) + return EXIT_ERROR + finally: + # Always close socket to prevent leaks + if sock: + try: + sock.close() + except Exception: + pass + + return EXIT_ERROR def _env_bool(name: str, default: bool = False) -> bool: @@ -198,96 +295,6 @@ def _env_bool(name: str, default: bool = False) -> bool: return val not in ("0", "false", "no", "off") -def _is_pid_alive(pid: int) -> bool: - if pid <= 0: - return False - try: - os.kill(pid, 0) - return True - except OSError: - return False - except Exception: - return True - - -def _askd_start_argv() -> list[str] | None: - local = script_dir / "askd" - candidates: list[str] = [] - if local.exists(): - candidates.append(str(local)) - found = shutil.which("askd") - if found: - candidates.append(found) - if not candidates: - return None - - entry = candidates[0] - lower = entry.lower() - if lower.endswith((".cmd", ".bat", ".exe")): - return [entry] - return [sys.executable, entry] - - -def _ensure_unified_daemon_ready(timeout_s: float = 2.0) -> bool: - if not _use_unified_daemon(): - return True - - from askd_runtime import state_file_path - import askd_rpc - - state_file = state_file_path("askd.json") - try: - if askd_rpc.ping_daemon("ask", 0.2, state_file): - return True - except Exception: - pass - - if not _env_bool("CCB_ASKD_AUTOSTART", True): - return False - - argv = _askd_start_argv() - if not argv: - return False - - env = os.environ.copy() - parent_raw = (env.get("CCB_PARENT_PID") or "").strip() - if parent_raw: - try: - parent_pid = int(parent_raw) - except Exception: - parent_pid = 0 - if parent_pid <= 0 or not _is_pid_alive(parent_pid): - env.pop("CCB_PARENT_PID", None) - env.pop("CCB_MANAGED", None) - - kwargs = { - "stdin": subprocess.DEVNULL, - "stdout": subprocess.DEVNULL, - "stderr": subprocess.DEVNULL, - "close_fds": True, - "env": env, - } - if os.name == "nt": - kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) - else: - kwargs["start_new_session"] = True - - try: - subprocess.Popen(argv, **kwargs) - except Exception: - return False - - deadline = time.time() + max(0.2, float(timeout_s)) - while time.time() < deadline: - try: - if askd_rpc.ping_daemon("ask", 0.2, state_file): - return True - except Exception: - pass - time.sleep(0.1) - return False - - def _default_foreground() -> bool: # Allow explicit override if _env_bool("CCB_ASK_BACKGROUND", False): @@ -394,10 +401,15 @@ def main(argv: list[str]) -> int: return EXIT_ERROR # Notify mode: sync send, no wait for reply (used for hook notifications) - # MUST be checked before unified daemon path to avoid full request-response cycle - # which would cause reply-to-self loops via notify_completion -> ask -> daemon -> notify_completion if notify_mode: _require_caller() + if _use_unified_daemon(): + # TODO: Add fire-and-forget RPC mode for unified daemon + # For now, disable unified mode for notify and use legacy path + print("[WARN] Notify mode not yet supported with unified daemon, using legacy", file=sys.stderr) + # Fall through to legacy path below + + # Legacy daemon path for notify mode cmd = [daemon_cmd, "--sync"] if no_wrap: cmd.append("--no-wrap") @@ -416,33 +428,25 @@ def main(argv: list[str]) -> int: print(f"[ERROR] {e}", file=sys.stderr) return EXIT_ERROR - # Use unified daemon if enabled (default: True) - if _use_unified_daemon(): - caller = _require_caller() - return _send_via_unified_daemon(provider, message, timeout, no_wrap, caller) - - # Foreground mode: run provider directly (avoid background cleanup in managed envs) + # Foreground mode: run provider directly via unified daemon if foreground_mode: - cmd = [daemon_cmd, "--sync", "--timeout", str(timeout)] - if no_wrap and provider == "claude": - cmd.append("--no-wrap") - env = os.environ.copy() - env["CCB_CALLER"] = _require_caller() - try: - result = subprocess.run(cmd, input=message, text=True, env=env) - return result.returncode - except Exception as e: - print(f"[ERROR] {e}", file=sys.stderr) - return EXIT_ERROR - - # Default async mode: background task via nohup, using unified askd daemon - if _use_unified_daemon(): - ready_timeout = min(timeout, 2.0) if timeout and timeout > 0 else 2.0 - if not _ensure_unified_daemon_ready(timeout_s=ready_timeout): - print("[ERROR] Unified askd daemon not running", file=sys.stderr) - print("Start it with `askd` (or enable autostart via CCB_ASKD_AUTOSTART=1).", file=sys.stderr) - return EXIT_ERROR + if _use_unified_daemon(): + caller = _require_caller() + return _send_via_unified_daemon(provider, message, timeout, no_wrap, caller) + else: + cmd = [daemon_cmd, "--sync", "--timeout", str(timeout)] + if no_wrap and provider == "claude": + cmd.append("--no-wrap") + env = os.environ.copy() + env["CCB_CALLER"] = _require_caller() + try: + result = subprocess.run(cmd, input=message, text=True, env=env) + return result.returncode + except Exception as e: + print(f"[ERROR] {e}", file=sys.stderr) + return EXIT_ERROR + # Default async mode: background task via nohup task_id = make_task_id() log_dir = Path(tempfile.gettempdir()) / "ccb-tasks" log_dir.mkdir(parents=True, exist_ok=True) diff --git a/ccb b/ccb index 40d01dc..60bc0b5 100755 --- a/ccb +++ b/ccb @@ -21,6 +21,7 @@ import re import shutil import posixpath import shlex +import threading from pathlib import Path script_dir = Path(__file__).resolve().parent @@ -51,9 +52,9 @@ backend_env = get_backend_env() if backend_env and not os.environ.get("CCB_BACKEND_ENV"): os.environ["CCB_BACKEND_ENV"] = backend_env -VERSION = "5.2.6" -GIT_COMMIT = "v5.2.6" -GIT_DATE = "2026-02-24" +VERSION = "5.2.4" +GIT_COMMIT = "c539e79" +GIT_DATE = "2026-02-25" _WIN_DRIVE_RE = re.compile(r"^[A-Za-z]:([/\\\\]|$)") _MNT_DRIVE_RE = re.compile(r"^/mnt/([A-Za-z])/(.*)$") @@ -384,7 +385,14 @@ def _is_pid_alive(pid: int) -> bool: try: os.kill(int(pid), 0) return True + except ProcessLookupError: + # Process doesn't exist + return False + except PermissionError: + # Process exists but no permission to check + return True except Exception: + # Other errors - assume dead for safety return False @@ -570,6 +578,10 @@ class AILauncher: self.runtime_dir.mkdir(parents=True, exist_ok=True) self._cleaned = False self._askd_checked = False + self._watchdog_thread = None + self._watchdog_stop_event = None + self._daemon_proc = None # Track daemon Popen object for reaping + self._daemon_proc_lock = threading.Lock() # Protect daemon_proc access self.terminal_type = self._detect_terminal_type() self.tmux_sessions = {} self.tmux_panes = {} @@ -737,6 +749,24 @@ class AILauncher: def _maybe_start_caskd(self) -> None: self._maybe_start_provider_daemon("codex") + def _maybe_start_unified_askd(self) -> None: + """Start unified askd daemon (provider-agnostic).""" + # Try to start for any enabled provider that uses askd (including claude) + for provider in ["codex", "gemini", "opencode", "droid", "claude"]: + if provider in [p.lower() for p in self.providers]: + # Try to start and check if successful + self._maybe_start_provider_daemon(provider) + # Verify daemon actually started by pinging + try: + from askd_runtime import state_file_path + from askd.daemon import ping_daemon + state_file = state_file_path("askd.json") + if ping_daemon(timeout_s=0.5, state_file=state_file): + return # Successfully started + except Exception: + pass + # If not successful, continue to next provider + def _maybe_start_provider_daemon(self, provider: str) -> None: def _bool_from_env(name: str): raw = os.environ.get(name) @@ -804,12 +834,45 @@ class AILauncher: if spec.daemon_bin_name == "askd" and not self._askd_checked: self._askd_checked = True if not _owned_by_ccb(st): - print("⚠️ askd already running but not managed by this CCB; restarting to bind lifecycle.") - if callable(shutdown_daemon_fn): + # Check if forced rebind is enabled (case-insensitive) + force_rebind = _env_bool("CCB_FORCE_REBIND", False) + + # Check if foreign parent is still alive before forcing rebind + # Safely normalize parent_pid to int (handle None, non-int, etc.) + try: + foreign_parent_pid = int((st or {}).get("parent_pid") or 0) + except Exception: + foreign_parent_pid = 0 + + foreign_parent_alive = False + if not force_rebind and foreign_parent_pid > 0: try: - shutdown_daemon_fn(timeout_s=1.0, state_file=state_file) + foreign_parent_alive = _is_pid_alive(foreign_parent_pid) except Exception: - pass + foreign_parent_alive = False + + if force_rebind or not foreign_parent_alive: + # Safe to rebind: either forced or foreign parent is dead/stale + if force_rebind: + print(f"⚠️ CCB_FORCE_REBIND=1 set, forcing askd rebind despite live parent (PID {foreign_parent_pid})...") + else: + print(f"⚠️ askd owned by dead parent (PID {foreign_parent_pid}), restarting to bind lifecycle...") + if callable(shutdown_daemon_fn): + try: + shutdown_daemon_fn(timeout_s=1.0, state_file=state_file) + except Exception: + pass + else: + # Foreign parent is still alive, don't force rebind + print(f"⚠️ askd owned by live parent (PID {foreign_parent_pid}), skipping rebind to avoid disruption") + print(f" Set CCB_FORCE_REBIND=1 to override this safety check") + host = st.get("host") if isinstance(st, dict) else None + port = st.get("port") if isinstance(st, dict) else None + if host and port: + print(f"✅ {spec.daemon_bin_name} already running at {host}:{port}") + else: + print(f"✅ {spec.daemon_bin_name} already running") + return deadline = time.time() + 2.0 while time.time() < deadline: if not ping_daemon(timeout_s=0.2, state_file=state_file): @@ -860,7 +923,11 @@ class AILauncher: try: env = os.environ.copy() env["CCB_PARENT_PID"] = str(os.getpid()) - subprocess.Popen([sys.executable, str(daemon_script)], env=env, **kwargs) + proc = subprocess.Popen([sys.executable, str(daemon_script)], env=env, **kwargs) + # Track daemon process for reaping (thread-safe) + if spec.daemon_bin_name == "askd": + with self._daemon_proc_lock: + self._daemon_proc = proc except Exception as exc: print(f"⚠️ Failed to start {spec.daemon_bin_name}: {exc}") return @@ -879,6 +946,144 @@ class AILauncher: time.sleep(0.1) print(f"⚠️ {spec.daemon_bin_name} start requested, but daemon not reachable yet") + def _start_daemon_watchdog(self) -> None: + """Start watchdog thread to monitor askd daemon health.""" + import threading + + # Restart if thread exists but is dead + if self._watchdog_thread is not None: + if not self._watchdog_thread.is_alive(): + self._watchdog_thread = None + self._watchdog_stop_event = None + else: + return # Already running + + self._watchdog_stop_event = threading.Event() + self._watchdog_thread = threading.Thread( + target=self._daemon_watchdog_loop, + daemon=True, + name="askd-watchdog" + ) + self._watchdog_thread.start() + + def _daemon_watchdog_loop(self) -> None: + """Watchdog loop to monitor and restart askd daemon if needed.""" + from askd_runtime import state_file_path + from askd.daemon import ping_daemon, read_state + + # Validate and clamp check interval + try: + check_interval = float(os.environ.get("CCB_WATCHDOG_INTERVAL_S", "10")) + check_interval = max(1.0, min(check_interval, 300.0)) # Clamp to 1-300 seconds + except (ValueError, TypeError): + check_interval = 10.0 + + consecutive_failures = 0 + max_failures = 3 + ownership_mismatch_count = 0 + max_ownership_mismatches = 3 + + while not self._watchdog_stop_event.wait(check_interval): + try: + # Reap zombie child processes + self._reap_zombie_children() + + # Check if tracked daemon process has exited + with self._daemon_proc_lock: + if self._daemon_proc is not None: + exit_code = self._daemon_proc.poll() + if exit_code is not None: + # Daemon process has exited + print(f"⚠️ askd daemon process exited with code {exit_code}", file=sys.stderr) + self._daemon_proc = None + # Will be restarted by health check below + + # Check askd daemon health + state_file = state_file_path("askd.json") + if not ping_daemon(timeout_s=0.5, state_file=state_file): + consecutive_failures += 1 + if consecutive_failures >= max_failures: + # Daemon is unhealthy, try to restart + print(f"⚠️ askd daemon unhealthy (failed {consecutive_failures} checks), attempting restart...", file=sys.stderr) + self._maybe_start_unified_askd() + consecutive_failures = 0 + else: + # Daemon is healthy, verify ownership + state = read_state(state_file=state_file) + if isinstance(state, dict): + parent_pid = int(state.get("parent_pid") or 0) + managed = bool(state.get("managed")) + if managed and parent_pid != self.ccb_pid: + ownership_mismatch_count += 1 + print(f"⚠️ askd daemon ownership mismatch (parent_pid={parent_pid}, expected={self.ccb_pid}, count={ownership_mismatch_count})", file=sys.stderr) + if ownership_mismatch_count >= max_ownership_mismatches: + # Check if forced rebind is enabled (case-insensitive) + force_rebind = _env_bool("CCB_FORCE_REBIND", False) + + # Check if foreign parent is still alive before forcing rebind + foreign_parent_alive = False + if not force_rebind: + # Use existing _is_pid_alive for consistent cross-platform check + try: + foreign_parent_alive = _is_pid_alive(parent_pid) + except Exception: + foreign_parent_alive = False + + if force_rebind or not foreign_parent_alive: + # Safe to rebind: either forced or foreign parent is dead/stale + if force_rebind: + print(f"⚠️ CCB_FORCE_REBIND=1 set, forcing rebind despite live parent (PID {parent_pid})...", file=sys.stderr) + else: + print(f"⚠️ Foreign parent (PID {parent_pid}) is dead, attempting to rebind daemon...", file=sys.stderr) + try: + from askd_rpc import shutdown_daemon + shutdown_daemon("ask", timeout_s=2.0, state_file=state_file) + time.sleep(1.0) # Wait for shutdown + except Exception: + pass + self._maybe_start_unified_askd() + ownership_mismatch_count = 0 + else: + # Foreign parent is still alive, don't force rebind + print(f"⚠️ Foreign parent (PID {parent_pid}) is still alive, skipping forced rebind to avoid disruption", file=sys.stderr) + print(f" Set CCB_FORCE_REBIND=1 to override this safety check", file=sys.stderr) + ownership_mismatch_count = 0 # Reset to avoid repeated warnings + else: + ownership_mismatch_count = 0 # Reset on correct ownership + consecutive_failures = 0 + except Exception as e: + # Silently continue on watchdog errors + pass + + def _reap_zombie_children(self) -> None: + """Reap tracked daemon process if it has exited.""" + # Only reap the tracked daemon process, not all children + with self._daemon_proc_lock: + if self._daemon_proc is not None: + try: + exit_code = self._daemon_proc.poll() + if exit_code is not None: + # Process has exited, reap it by calling wait() + try: + self._daemon_proc.wait(timeout=0.1) + except Exception: + pass + except Exception: + pass + + def _stop_daemon_watchdog(self) -> None: + """Stop watchdog thread.""" + if self._watchdog_stop_event: + self._watchdog_stop_event.set() + if self._watchdog_thread: + self._watchdog_thread.join(timeout=1.0) + # Only clear if thread actually stopped + if not self._watchdog_thread.is_alive(): + self._watchdog_thread = None + else: + # Thread still running, log warning + pass # Silently continue, daemon thread will exit with process + def _detect_terminal_type(self): # Forced by environment variable forced = (os.environ.get("CCB_TERMINAL") or os.environ.get("CODEX_TERMINAL") or "").strip().lower() @@ -1130,34 +1335,6 @@ class AILauncher: def _claude_session_file(self) -> Path: return self._project_session_file(".claude-session") - def _backfill_existing_claude_session_work_dir_fields(self) -> None: - """Backfill work_dir/work_dir_norm for an existing .claude-session on startup.""" - path = self._claude_session_file() - if not path.exists(): - return - - try: - raw = path.read_text(encoding="utf-8-sig") - data = json.loads(raw) - except Exception: - return - if not isinstance(data, dict): - return - - changed = False - if not isinstance(data.get("work_dir"), str) or not str(data.get("work_dir") or "").strip(): - data["work_dir"] = str(self.project_root) - changed = True - if not isinstance(data.get("work_dir_norm"), str) or not str(data.get("work_dir_norm") or "").strip(): - data["work_dir_norm"] = _normalize_path_for_match(str(self.project_root)) - changed = True - - if not changed: - return - - payload = json.dumps(data, ensure_ascii=False, indent=2) - safe_write_session(path, payload) - def _read_local_claude_session_id(self) -> str | None: data = self._read_json_file(self._claude_session_file()) sid = data.get("claude_session_id") @@ -2953,6 +3130,10 @@ class AILauncher: if self._cleaned: return self._cleaned = True + + # Stop watchdog thread first + self._stop_daemon_watchdog() + if not quiet: print(f"\n🧹 {t('cleaning_up')}") @@ -3011,10 +3192,47 @@ class AILauncher: except Exception: pass - # Ensure unified askd daemon exits when CCB exits. + # Ensure unified askd daemon exits when CCB exits (with ownership safety). try: askd_state = state_file_path("askd.json") - shutdown_daemon("ask", 1.0, askd_state) + + # Read askd state to check ownership + askd_st = {} + if askd_state and askd_state.exists(): + try: + with open(askd_state, "r", encoding="utf-8") as f: + askd_st = json.load(f) + except Exception: + pass + + # Check if we own this daemon or if forced shutdown is enabled + force_rebind = _env_bool("CCB_FORCE_REBIND", False) + owned_by_us = False + try: + parent_pid = int((askd_st or {}).get("parent_pid") or 0) + owned_by_us = (parent_pid == self.ccb_pid) + except Exception: + pass + + # Only shutdown if we own it, or force flag is set, or owner is dead + should_shutdown = False + if force_rebind: + should_shutdown = True + elif owned_by_us: + should_shutdown = True + else: + # Check if foreign owner is still alive + try: + parent_pid = int((askd_st or {}).get("parent_pid") or 0) + if parent_pid > 0: + foreign_alive = _is_pid_alive(parent_pid) + if not foreign_alive: + should_shutdown = True # Owner is dead, safe to cleanup + except Exception: + pass + + if should_shutdown: + shutdown_daemon("ask", 1.0, askd_state) except Exception: pass @@ -3057,11 +3275,6 @@ class AILauncher: if not self._require_project_config_dir(): return 2 - try: - self._backfill_existing_claude_session_work_dir_fields() - except Exception: - pass - if not self.providers: print("❌ No providers configured. Define providers in ccb.config or pass them on the command line.", file=sys.stderr) return 2 @@ -3186,6 +3399,9 @@ class AILauncher: if "codex" in self.providers and self.anchor_provider != "codex": self._maybe_start_caskd() + # Start watchdog thread to monitor daemon health (provider-agnostic) + self._start_daemon_watchdog() + try: try: self._sync_cend_registry() diff --git a/lib/askd/adapters/base.py b/lib/askd/adapters/base.py index bd418bc..484fda2 100644 --- a/lib/askd/adapters/base.py +++ b/lib/askd/adapters/base.py @@ -61,6 +61,8 @@ class QueuedTask: req_id: str done_event: threading.Event result: Optional[ProviderResult] = None + cancelled: bool = False # Cancellation flag for timeout/expiry + cancel_event: Optional[threading.Event] = None # Event for cooperative cancellation class BaseProviderAdapter(ABC): diff --git a/lib/askd/adapters/claude.py b/lib/askd/adapters/claude.py index 84a2006..67f36fc 100644 --- a/lib/askd/adapters/claude.py +++ b/lib/askd/adapters/claude.py @@ -546,11 +546,17 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: task, session, session_key, started_ms, log_reader, state, backend, pane_id, deadline ) result.reply = self._postprocess_reply(req, result.reply) - self._finalize_result(result, req) + self._finalize_result(result, req, task) return result - def _finalize_result(self, result: ProviderResult, req: ProviderRequest) -> None: + def _finalize_result(self, result: ProviderResult, req: ProviderRequest, task: QueuedTask) -> None: _write_log(f"[INFO] done provider=claude req_id={result.req_id} exit={result.exit_code}") + + # Skip completion hook for cancelled tasks + if task.cancelled: + _write_log(f"[INFO] Task cancelled, skipping completion hook: req_id={task.req_id}") + return + _write_log(f"[INFO] notify_completion caller={req.caller} done_seen={result.done_seen} email_req_id={req.email_req_id}") notify_completion( provider="claude", diff --git a/lib/askd/adapters/codex.py b/lib/askd/adapters/codex.py index 1655f98..09e2b25 100644 --- a/lib/askd/adapters/codex.py +++ b/lib/askd/adapters/codex.py @@ -12,7 +12,7 @@ from askd.adapters.base import BaseProviderAdapter, ProviderRequest, ProviderResult, QueuedTask from askd_runtime import log_path, write_log -from ccb_protocol import REQ_ID_PREFIX, is_done_text, strip_done_text, wrap_codex_prompt +from ccb_protocol import REQ_ID_PREFIX, is_done_text, strip_done_text, extract_reply_for_req, wrap_codex_prompt from caskd_session import CodexProjectSession, compute_session_key, load_project_session from codex_comm import CodexLogReader from completion_hook import notify_completion @@ -118,6 +118,11 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: done_ms: Optional[int] = None fallback_scan = False + # Idle timeout detection for degraded completion + idle_timeout = float(os.environ.get("CCB_CASKD_IDLE_TIMEOUT", "8.0")) + _last_reply_snapshot = "" + _last_reply_changed_at = time.time() + anchor_grace_deadline = min(deadline, time.time() + 1.5) if deadline else (time.time() + 1.5) anchor_collect_grace = min(deadline, time.time() + 2.0) if deadline else (time.time() + 2.0) rebounded = False @@ -127,6 +132,11 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: pane_check_interval = float(os.environ.get("CCB_CASKD_PANE_CHECK_INTERVAL", default_interval)) while True: + # Check for cancellation + if task.cancel_event and task.cancel_event.is_set(): + _write_log(f"[INFO] Task cancelled during wait loop: req_id={task.req_id}") + break + if deadline is not None: remaining = deadline - time.time() if remaining <= 0: @@ -163,14 +173,28 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: last_pane_check = time.time() event, state = reader.wait_for_event(state, wait_step) + + # Active session detection: check if reader switched to a newer session + current_log = reader.current_log_path() + if current_log and current_log != state.get("log_path"): + _write_log(f"[INFO] Active session switch detected: {current_log}, req_id={task.req_id}") + # Update state to follow new session + state = _tail_state_for_log(current_log, tail_bytes=tail_bytes) + fallback_scan = True + # Don't set rebounded=True to allow further rebinds if needed + + # Force rebind at grace deadline even if we have stale events + if (not rebounded) and (not anchor_seen) and time.time() >= anchor_grace_deadline and codex_session_id: + _write_log(f"[WARN] Anchor not seen by grace deadline, forcing rebind: req_id={task.req_id}") + codex_session_id = None + reader = CodexLogReader(log_path=preferred_log, session_id_filter=None, work_dir=Path(session.work_dir)) + log_hint = reader.current_log_path() + state = _tail_state_for_log(log_hint, tail_bytes=tail_bytes) + fallback_scan = True + rebounded = True + continue + if event is None: - if (not rebounded) and (not anchor_seen) and time.time() >= anchor_grace_deadline and codex_session_id: - codex_session_id = None - reader = CodexLogReader(log_path=preferred_log, session_id_filter=None, work_dir=Path(session.work_dir)) - log_hint = reader.current_log_path() - state = _tail_state_for_log(log_hint, tail_bytes=tail_bytes) - fallback_scan = True - rebounded = True continue role, text = event @@ -184,6 +208,8 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: if role != "assistant": continue + # Use grace window: allow collecting after grace period even without anchor + # (but prefer waiting for anchor during grace period) if (not anchor_seen) and time.time() < anchor_collect_grace: continue @@ -194,8 +220,40 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: done_ms = _now_ms() - started_ms break + # Idle-timeout: detect when Codex finished but forgot CCB_DONE + if combined != _last_reply_snapshot: + _last_reply_snapshot = combined + _last_reply_changed_at = time.time() + elif combined and (time.time() - _last_reply_changed_at >= idle_timeout): + _write_log( + f"[WARN] Codex reply idle for {idle_timeout}s without CCB_DONE, " + f"accepting as complete req_id={task.req_id}" + ) + done_seen = True + done_ms = _now_ms() - started_ms + break + combined = "\n".join(chunks) - reply = strip_done_text(combined, task.req_id) + reply = extract_reply_for_req(combined, task.req_id) + + # Fallback: if timeout but we have a reply with any CCB_DONE marker, + # accept it even if req_id doesn't match (degraded completion detection) + degraded_completion = False + if not done_seen and combined and "CCB_DONE:" in combined: + _write_log(f"[WARN] Found CCB_DONE but req_id mismatch for req_id={task.req_id}") + # Extract the mismatched req_id for logging + for line in combined.splitlines(): + if "CCB_DONE:" in line: + _write_log(f"[WARN] Expected: CCB_DONE: {task.req_id}, Found: {line.strip()}") + break + # Only accept if we have non-empty reply + if reply.strip(): + done_seen = True + degraded_completion = True + done_ms = _now_ms() - started_ms + else: + _write_log(f"[WARN] Degraded completion rejected: empty reply for req_id={task.req_id}") + codex_log_path = None try: lp = state.get("log_path") @@ -221,6 +279,11 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: f"anchor={result.anchor_seen} done={result.done_seen}" ) + # Skip completion hook for cancelled tasks + if task.cancelled: + _write_log(f"[INFO] Task cancelled, skipping completion hook: req_id={task.req_id}") + return result + # Log caller info before notify_completion _write_log(f"[INFO] notify_completion caller={req.caller} done_seen={done_seen}") diff --git a/lib/askd/adapters/droid.py b/lib/askd/adapters/droid.py index 9005284..6356579 100644 --- a/lib/askd/adapters/droid.py +++ b/lib/askd/adapters/droid.py @@ -129,6 +129,11 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: last_pane_check = time.time() while True: + # Check for cancellation + if task.cancel_event and task.cancel_event.is_set(): + _write_log(f"[INFO] Task cancelled during wait loop: req_id={task.req_id}") + break + if deadline is not None: remaining = deadline - time.time() if remaining <= 0: @@ -190,18 +195,22 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: combined = "\n".join(chunks) final_reply = extract_reply_for_req(combined, task.req_id) - notify_completion( - provider="droid", - output_file=req.output_path, - reply=final_reply, - req_id=task.req_id, - done_seen=done_seen, - caller=req.caller, - email_req_id=req.email_req_id, - email_msg_id=req.email_msg_id, - email_from=req.email_from, - work_dir=req.work_dir, - ) + # Skip completion hook for cancelled tasks + if not task.cancelled: + notify_completion( + provider="droid", + output_file=req.output_path, + reply=final_reply, + req_id=task.req_id, + done_seen=done_seen, + caller=req.caller, + email_req_id=req.email_req_id, + email_msg_id=req.email_msg_id, + email_from=req.email_from, + work_dir=req.work_dir, + ) + else: + _write_log(f"[INFO] Task cancelled, skipping completion hook: req_id={task.req_id}") result = ProviderResult( exit_code=0 if done_seen else 2, diff --git a/lib/askd/adapters/gemini.py b/lib/askd/adapters/gemini.py index 55589cd..2238e97 100644 --- a/lib/askd/adapters/gemini.py +++ b/lib/askd/adapters/gemini.py @@ -177,6 +177,11 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: _last_reply_changed_at = time.time() while True: + # Check for cancellation + if task.cancel_event and task.cancel_event.is_set(): + _write_log(f"[INFO] Task cancelled during wait loop: req_id={task.req_id}") + break + if deadline is not None: remaining = deadline - time.time() if remaining <= 0: @@ -262,22 +267,29 @@ def handle_task(self, task: QueuedTask) -> ProviderResult: if "CCB_DONE:" in line: _write_log(f"[WARN] Expected: CCB_DONE: {task.req_id}, Found: {line.strip()}") break - # Accept as completed in degraded mode - done_seen = True - done_ms = _now_ms() - started_ms - - notify_completion( - provider="gemini", - output_file=req.output_path, - reply=final_reply, - req_id=task.req_id, - done_seen=done_seen, - caller=req.caller, - email_req_id=req.email_req_id, - email_msg_id=req.email_msg_id, - email_from=req.email_from, - work_dir=req.work_dir, - ) + # Only accept if we have non-empty reply + if final_reply.strip(): + done_seen = True + done_ms = _now_ms() - started_ms + else: + _write_log(f"[WARN] Degraded completion rejected: empty reply for req_id={task.req_id}") + + # Skip completion hook for cancelled tasks + if not task.cancelled: + notify_completion( + provider="gemini", + output_file=req.output_path, + reply=final_reply, + req_id=task.req_id, + done_seen=done_seen, + caller=req.caller, + email_req_id=req.email_req_id, + email_msg_id=req.email_msg_id, + email_from=req.email_from, + work_dir=req.work_dir, + ) + else: + _write_log(f"[INFO] Task cancelled, skipping completion hook: req_id={task.req_id}") result = ProviderResult( exit_code=0 if done_seen else 2, diff --git a/lib/askd/adapters/opencode.py b/lib/askd/adapters/opencode.py index 2a08e00..091efa7 100644 --- a/lib/askd/adapters/opencode.py +++ b/lib/askd/adapters/opencode.py @@ -164,6 +164,11 @@ def _handle_task_locked(self, task: QueuedTask, session: Any, session_key: str, last_pane_check = time.time() while True: + # Check for cancellation + if task.cancel_event and task.cancel_event.is_set(): + _write_log(f"[INFO] Task cancelled during wait loop: req_id={task.req_id}") + break + if deadline is not None: remaining = deadline - time.time() if remaining <= 0: @@ -210,22 +215,29 @@ def _handle_task_locked(self, task: QueuedTask, session: Any, session_key: str, if "CCB_DONE:" in line: _write_log(f"[WARN] Expected: CCB_DONE: {task.req_id}, Found: {line.strip()}") break - # Accept as completed in degraded mode - done_seen = True - done_ms = _now_ms() - started_ms - - notify_completion( - provider="opencode", - output_file=req.output_path, - reply=final_reply, - req_id=task.req_id, - done_seen=done_seen, - caller=req.caller, - email_req_id=req.email_req_id, - email_msg_id=req.email_msg_id, - email_from=req.email_from, - work_dir=req.work_dir, - ) + # Only accept if we have non-empty reply + if final_reply.strip(): + done_seen = True + done_ms = _now_ms() - started_ms + else: + _write_log(f"[WARN] Degraded completion rejected: empty reply for req_id={task.req_id}") + + # Skip completion hook for cancelled tasks + if not task.cancelled: + notify_completion( + provider="opencode", + output_file=req.output_path, + reply=final_reply, + req_id=task.req_id, + done_seen=done_seen, + caller=req.caller, + email_req_id=req.email_req_id, + email_msg_id=req.email_msg_id, + email_from=req.email_from, + work_dir=req.work_dir, + ) + else: + _write_log(f"[INFO] Task cancelled, skipping completion hook: req_id={task.req_id}") result = ProviderResult( exit_code=0 if done_seen else 2, diff --git a/lib/askd/daemon.py b/lib/askd/daemon.py index 2b08f62..ac2e531 100644 --- a/lib/askd/daemon.py +++ b/lib/askd/daemon.py @@ -73,11 +73,14 @@ def submit(self, provider_key: str, request: ProviderRequest) -> Optional[Queued return None req_id = request.req_id or make_req_id() + cancel_event = threading.Event() task = QueuedTask( request=request, created_ms=_now_ms(), req_id=req_id, done_event=threading.Event(), + cancelled=False, + cancel_event=cancel_event, ) session = adapter.load_session(Path(request.work_dir)) @@ -187,6 +190,13 @@ def _handle_request(self, msg: dict) -> dict: task.done_event.wait(timeout=wait_timeout) result = task.result + # If timeout occurred and task is still running, mark it as cancelled + if not result and not task.done_event.is_set(): + _write_log(f"[WARN] Task timeout, marking as cancelled: provider={provider} req_id={task.req_id}") + task.cancelled = True + if task.cancel_event: + task.cancel_event.set() + if not result: return { "type": "ask.response", diff --git a/lib/askd_server.py b/lib/askd_server.py index 932a759..e64e996 100644 --- a/lib/askd_server.py +++ b/lib/askd_server.py @@ -86,6 +86,10 @@ def __init__( self.managed = env_managed if managed is None else bool(managed) if self.parent_pid: self.managed = True + self._heartbeat_thread = None + self._heartbeat_stop_event = None + self._started_at = None + self._state_write_lock = threading.Lock() # Serialize persistent state writes def serve_forever(self) -> int: run_dir().mkdir(parents=True, exist_ok=True) @@ -236,14 +240,32 @@ def _parent_monitor() -> None: actual_host, actual_port = httpd.server_address self._write_state(str(actual_host), int(actual_port)) + self._started_at = time.strftime("%Y-%m-%d %H:%M:%S") + self._write_persistent_state("running") + self._start_heartbeat_thread() write_log( log_path(self.spec.log_file_name), f"[INFO] {self.spec.daemon_key} started pid={os.getpid()} addr={actual_host}:{actual_port}", ) + + crashed = False + crash_reason = "" try: httpd.serve_forever(poll_interval=0.2) + except Exception as e: + # Unexpected crash during serve + crashed = True + crash_reason = f"Exception: {e}" + write_log(log_path(self.spec.log_file_name), f"[ERROR] {self.spec.daemon_key} crashed: {e}") + self._stop_heartbeat_thread() + self._write_persistent_state("crashed", crash_reason) + raise finally: write_log(log_path(self.spec.log_file_name), f"[INFO] {self.spec.daemon_key} stopped") + self._stop_heartbeat_thread() + # Only write stopped if not crashed + if not crashed: + self._write_persistent_state("stopped", "graceful shutdown") if self.on_stop: try: self.on_stop() @@ -277,3 +299,71 @@ def _write_state(self, host: str, port: int) -> None: os.chmod(self.state_file, 0o600) except Exception: pass + + def _write_persistent_state(self, status: str, exit_reason: str = "", exit_code: int = 0) -> None: + """Write persistent state to askd.last.json for debugging and observability.""" + with self._state_write_lock: # Serialize writes + last_state_file = self.state_file.parent / f"{self.state_file.stem}.last.json" + payload = { + "status": status, # running, stopping, stopped, crashed + "pid": os.getpid(), + "started_at": self._started_at, + "heartbeat_at": time.strftime("%Y-%m-%d %H:%M:%S"), + "work_dir": self.work_dir, + "parent_pid": int(self.parent_pid or 0) or None, + "managed": bool(self.managed), + } + if status in ("stopped", "crashed"): + payload["stopped_at"] = time.strftime("%Y-%m-%d %H:%M:%S") + if exit_reason: + payload["exit_reason"] = exit_reason + if exit_code: + payload["exit_code"] = exit_code + try: + last_state_file.parent.mkdir(parents=True, exist_ok=True) + safe_write_session(last_state_file, json.dumps(payload, ensure_ascii=False, indent=2) + "\n") + except Exception: + pass + + def _start_heartbeat_thread(self) -> None: + """Start heartbeat thread to periodically update state file.""" + # Restart if thread exists but is dead + if self._heartbeat_thread is not None: + if not self._heartbeat_thread.is_alive(): + self._heartbeat_thread = None + self._heartbeat_stop_event = None + else: + return # Already running + + self._heartbeat_stop_event = threading.Event() + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_loop, + daemon=True, + name="askd-heartbeat" + ) + self._heartbeat_thread.start() + + def _heartbeat_loop(self) -> None: + """Periodically update heartbeat_at in state file.""" + # Validate and clamp interval + try: + interval = float(os.environ.get("CCB_HEARTBEAT_INTERVAL_S", "2")) + interval = max(0.5, min(interval, 60.0)) # Clamp to 0.5-60 seconds + except (ValueError, TypeError): + interval = 2.0 + + while not self._heartbeat_stop_event.wait(interval): + try: + self._write_persistent_state("running") + except Exception: + pass + + def _stop_heartbeat_thread(self) -> None: + """Stop heartbeat thread.""" + if self._heartbeat_stop_event: + self._heartbeat_stop_event.set() + if self._heartbeat_thread: + self._heartbeat_thread.join(timeout=0.5) + # Only clear if thread actually stopped + if not self._heartbeat_thread.is_alive(): + self._heartbeat_thread = None diff --git a/lib/ccb_protocol.py b/lib/ccb_protocol.py index 213639f..3fbe356 100644 --- a/lib/ccb_protocol.py +++ b/lib/ccb_protocol.py @@ -99,6 +99,54 @@ def strip_done_text(text: str, req_id: str) -> str: return "\n".join(lines).rstrip() +def extract_reply_for_req(text: str, req_id: str) -> str: + """ + Extract the reply segment for req_id from a message. + + When multiple replies are present (each ending with CCB_DONE: ), + extract only the segment between the previous done line and the done line + for our req_id. This prevents mixing old/stale content into the current reply. + """ + lines = [ln.rstrip("\n") for ln in (text or "").splitlines()] + if not lines: + return "" + + # Find all done-line indices and target req_id indices + target_re = re.compile(rf"^\s*CCB_DONE:\s*{re.escape(req_id)}\s*$", re.IGNORECASE) + done_idxs = [i for i, ln in enumerate(lines) if _ANY_CCB_DONE_LINE_RE.match(ln or "")] + target_idxs = [i for i in done_idxs if target_re.match(lines[i] or "")] + + if not target_idxs: + # No CCB_DONE for our req_id found + # If there are other CCB_DONE markers, this is likely old content - return empty + # to avoid mixing old replies into current request + if done_idxs: + return "" # Prevent returning old content + # No CCB_DONE markers at all - fallback to strip behavior + return strip_done_text(text, req_id) + + # Find the last occurrence of our req_id's done line + target_i = target_idxs[-1] + + # Find the previous done line (any req_id) + prev_done_i = -1 + for i in reversed(done_idxs): + if i < target_i: + prev_done_i = i + break + + # Extract segment between previous done and our done + segment = lines[prev_done_i + 1 : target_i] + + # Trim leading/trailing blank lines + while segment and segment[0].strip() == "": + segment = segment[1:] + while segment and segment[-1].strip() == "": + segment = segment[:-1] + + return "\n".join(segment).rstrip() + + @dataclass(frozen=True) class CaskdRequest: client_id: str diff --git a/lib/codex_comm.py b/lib/codex_comm.py index 4d1bab9..c66f1ee 100644 --- a/lib/codex_comm.py +++ b/lib/codex_comm.py @@ -270,8 +270,34 @@ def _scan_latest(self) -> Optional[Path]: def _latest_log(self) -> Optional[Path]: preferred = self._preferred_log if preferred and preferred.exists(): - # If we're bound to a specific session id, prefer that log without cross-session scanning. + # If we're bound to a specific session id, check if there's a newer session if self._session_id_filter: + # Periodically check for newer sessions (every 10 seconds) + now = time.time() + if not hasattr(self, '_last_session_check'): + self._last_session_check = now + + if now - self._last_session_check >= 10.0: + self._last_session_check = now + # Temporarily disable filter to scan all sessions + old_filter = self._session_id_filter + self._session_id_filter = None + latest = self._scan_latest() + self._session_id_filter = old_filter + + if latest and latest != preferred: + try: + preferred_mtime = preferred.stat().st_mtime + latest_mtime = latest.stat().st_mtime + if latest_mtime > preferred_mtime + 5.0: # At least 5s newer + self._debug(f"[Active Detection] Found newer session, switching: {latest}") + self._preferred_log = latest + # Clear filter to allow reading new session + self._session_id_filter = None + return latest + except OSError: + pass + self._debug(f"Using preferred log (bound): {preferred}") return preferred diff --git a/lib/completion_hook.py b/lib/completion_hook.py index 11fc997..5300973 100644 --- a/lib/completion_hook.py +++ b/lib/completion_hook.py @@ -32,6 +32,7 @@ def _run_hook_async( email_msg_id: str = "", email_from: str = "", work_dir: str = "", + done_seen: bool = True, ) -> None: """Run the completion hook in a background thread.""" if not env_bool("CCB_COMPLETION_HOOK_ENABLED", True): @@ -75,6 +76,7 @@ def _run(): # Set up environment with caller and email-related vars env = os.environ.copy() env["CCB_CALLER"] = caller # Ensure caller is passed via env var + env["CCB_DONE_SEEN"] = "1" if done_seen else "0" # Pass completion status if email_req_id: env["CCB_EMAIL_REQ_ID"] = email_req_id if email_msg_id: @@ -99,7 +101,8 @@ def _run(): thread = threading.Thread(target=_run, daemon=False) thread.start() - thread.join(timeout=65) # Wait for hook to complete (match subprocess timeout + buffer) + # Wait briefly to ensure hook starts, but don't block worker for full duration + thread.join(timeout=2.0) def notify_completion( @@ -129,7 +132,7 @@ def notify_completion( email_from: Original sender email address (for email caller) work_dir: Working directory for session file lookup """ - if not done_seen: - return - - _run_hook_async(provider, output_file, reply, req_id, caller, email_req_id, email_msg_id, email_from, work_dir) + # Always notify completion, even if done_seen=False + # Let the hook receiver decide how to handle incomplete/timeout cases + # This prevents "processing forever" when CCB_DONE marker is missing/mismatched + _run_hook_async(provider, output_file, reply, req_id, caller, email_req_id, email_msg_id, email_from, work_dir, done_seen) diff --git a/lib/daskd_protocol.py b/lib/daskd_protocol.py index 57acefd..10a7ead 100644 --- a/lib/daskd_protocol.py +++ b/lib/daskd_protocol.py @@ -87,6 +87,10 @@ def extract_reply_for_req(text: str, req_id: str) -> str: target_idxs = [i for i in done_idxs if target_re.match(lines[i] or "")] if not target_idxs: + # No CCB_DONE for our req_id found + # If there are other CCB_DONE markers, this is likely old content - return empty + if done_idxs: + return "" # Prevent returning old content return strip_done_text(text, req_id) target_i = target_idxs[-1] diff --git a/lib/gaskd_protocol.py b/lib/gaskd_protocol.py index 26c4bfe..897f074 100644 --- a/lib/gaskd_protocol.py +++ b/lib/gaskd_protocol.py @@ -46,6 +46,10 @@ def extract_reply_for_req(text: str, req_id: str) -> str: target_idxs = [i for i in done_idxs if target_re.match(lines[i] or "")] if not target_idxs: + # No CCB_DONE for our req_id found + # If there are other CCB_DONE markers, this is likely old content - return empty + if done_idxs: + return "" # Prevent returning old content # Fallback: keep existing behavior (strip only if the last line matches). return strip_done_text(text, req_id) diff --git a/lib/memory/transfer.py b/lib/memory/transfer.py index c0eadf8..063d4ab 100644 --- a/lib/memory/transfer.py +++ b/lib/memory/transfer.py @@ -299,14 +299,13 @@ def _extract_from_codex( session_id: Optional[str] = None, ) -> TransferContext: session_file, data = self._load_session_data("codex") + # Only use current active session, not old_* fallback to prevent "amnesia" log_path = session_path or ( data.get("codex_session_path") - or data.get("old_codex_session_path") or data.get("session_path") ) session_id = session_id or ( data.get("codex_session_id") - or data.get("old_codex_session_id") or data.get("session_id") or "" ) @@ -353,13 +352,13 @@ def _extract_from_gemini( session_id: Optional[str] = None, ) -> TransferContext: session_file, data = self._load_session_data("gemini") + # Only use current active session, not old_* fallback to prevent "amnesia" session_id = session_id or ( data.get("gemini_session_id") - or data.get("old_gemini_session_id") or data.get("session_id") or "" ) - preferred_path = session_path or (data.get("gemini_session_path") or data.get("old_gemini_session_path")) + preferred_path = session_path or data.get("gemini_session_path") preferred_path_obj: Optional[Path] = None if preferred_path: try: @@ -401,13 +400,13 @@ def _extract_from_droid( session_id: Optional[str] = None, ) -> TransferContext: session_file, data = self._load_session_data("droid") + # Only use current active session, not old_* fallback to prevent "amnesia" session_id = session_id or ( data.get("droid_session_id") - or data.get("old_droid_session_id") or data.get("session_id") or "" ) - preferred_path = session_path or (data.get("droid_session_path") or data.get("old_droid_session_path")) + preferred_path = session_path or data.get("droid_session_path") preferred_path_obj: Optional[Path] = None if preferred_path: try: @@ -451,14 +450,14 @@ def _extract_from_opencode( project_id: Optional[str] = None, ) -> TransferContext: session_file, data = self._load_session_data("opencode") + # Only use current active session, not old_* fallback to prevent "amnesia" session_id = session_id or ( data.get("opencode_session_id") - or data.get("old_opencode_session_id") or data.get("opencode_storage_session_id") or data.get("session_id") or "" ) - project_id = project_id or (data.get("opencode_project_id") or data.get("old_opencode_project_id") or "") + project_id = project_id or data.get("opencode_project_id") or "" from opencode_comm import OpenCodeLogReader diff --git a/lib/terminal.py b/lib/terminal.py index 829c892..be64eaa 100644 --- a/lib/terminal.py +++ b/lib/terminal.py @@ -644,7 +644,9 @@ def send_text(self, pane_id: str, text: str) -> None: self._tmux_run(["send-keys", "-t", session, "-l", sanitized], check=True) self._tmux_run(["send-keys", "-t", session, "Enter"], check=True) return - buffer_name = f"ccb-tb-{os.getpid()}-{int(time.time() * 1000)}" + # Use random suffix to avoid collision under high concurrency + import random + buffer_name = f"ccb-tb-{os.getpid()}-{int(time.time() * 1000)}-{random.randint(1000, 9999)}" self._tmux_run(["load-buffer", "-b", buffer_name, "-"], check=True, input_bytes=sanitized.encode("utf-8")) try: self._tmux_run(["paste-buffer", "-t", session, "-b", buffer_name, "-p"], check=True) @@ -658,7 +660,9 @@ def send_text(self, pane_id: str, text: str) -> None: # Pane-oriented: bracketed paste + unique tmux buffer + cleanup self._ensure_not_in_copy_mode(pane_id) - buffer_name = f"ccb-tb-{os.getpid()}-{int(time.time() * 1000)}" + # Use random suffix to avoid collision under high concurrency + import random + buffer_name = f"ccb-tb-{os.getpid()}-{int(time.time() * 1000)}-{random.randint(1000, 9999)}" self._tmux_run(["load-buffer", "-b", buffer_name, "-"], check=True, input_bytes=sanitized.encode("utf-8")) try: self._tmux_run(["paste-buffer", "-p", "-t", pane_id, "-b", buffer_name], check=True) @@ -909,24 +913,22 @@ def _send_enter(self, pane_id: str) -> None: time.sleep(enter_delay) env_method_raw = os.environ.get("CCB_WEZTERM_ENTER_METHOD") - # Default behavior is intentionally unchanged on non-Windows platforms: - # previously we used `send-text` with a CR byte; keep that unless the user overrides. - default_method = "auto" if os.name == "nt" else "text" + # Default to "auto" (prefer key injection) on all platforms for better TUI compatibility + default_method = "auto" method = (env_method_raw or default_method).strip().lower() if method not in {"auto", "key", "text"}: method = default_method - # Retry mechanism for reliability (Windows native occasionally drops Enter) + # Retry mechanism for reliability max_retries = 3 for attempt in range(max_retries): - # Only enable "auto key" behavior by default on native Windows. - # Users can force key injection everywhere via CCB_WEZTERM_ENTER_METHOD=key. - if method == "key" or (method == "auto" and os.name == "nt"): + # Try key injection first (works better with raw-mode TUIs) + if method in {"key", "auto"}: if self._send_key_cli(pane_id, "Enter"): return # Fallback: send CR byte; works for shells/readline, but not for all raw-mode TUIs. - if method in {"auto", "text", "key"}: + if method in {"auto", "text"}: result = _run( [*self._cli_base_args(), "send-text", "--pane-id", pane_id, "--no-paste"], input=b"\r", diff --git a/lib/worker_pool.py b/lib/worker_pool.py index c3f9802..c09b270 100644 --- a/lib/worker_pool.py +++ b/lib/worker_pool.py @@ -36,6 +36,12 @@ def run(self) -> None: task = self._q.get(timeout=0.2) except queue.Empty: continue + + # Skip cancelled/expired tasks + if hasattr(task, 'cancelled') and task.cancelled: + task.done_event.set() + continue + try: task.result = self._handle_task(task) except Exception as exc: