Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 157 additions & 153 deletions bin/ask
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ Examples:
from __future__ import annotations

import os
import shutil
import subprocess
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path

Expand Down Expand Up @@ -100,34 +98,90 @@ def _use_unified_daemon() -> bool:
return True # Default to unified daemon


def _maybe_start_unified_daemon() -> bool:
"""Try to start unified askd daemon if not running."""
import shutil
import time
import sys
from askd_runtime import state_file_path
from askd.daemon import ping_daemon

# Check if already running
state_file = state_file_path("askd.json")
if ping_daemon(timeout_s=0.5, state_file=state_file):
return True

# Find askd binary
candidates: list[str] = []
local = (Path(__file__).resolve().parent / "askd")
if local.exists():
candidates.append(str(local))
found = shutil.which("askd")
if found:
candidates.append(found)
if not candidates:
return False

# Prepare command with cross-platform handling
entry = candidates[0]
lower = entry.lower()
if lower.endswith((".cmd", ".bat", ".exe")):
argv = [entry]
else:
argv = [sys.executable, entry]

# Start daemon in background with platform-specific flags
try:
kwargs = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
"close_fds": True,
}
if os.name == "nt":
kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
else:
kwargs["start_new_session"] = True
subprocess.Popen(argv, **kwargs)
except Exception:
return False

# Wait for daemon to be ready
deadline = time.time() + 2.0
while time.time() < deadline:
if ping_daemon(timeout_s=0.2, state_file=state_file):
return True
time.sleep(0.1)

return False


def _send_via_unified_daemon(
provider: str,
message: str,
timeout: float,
no_wrap: bool,
caller: str,
) -> int:
"""Send request via unified askd daemon."""
"""Send request via unified askd daemon with auto-start retry."""
import json
import socket

from askd_runtime import state_file_path
import askd_rpc

ready_timeout = min(timeout, 2.0) if timeout and timeout > 0 else 2.0
if not _ensure_unified_daemon_ready(timeout_s=ready_timeout):
print("[ERROR] Unified askd daemon not running", file=sys.stderr)
print("Start it with `askd` (or enable autostart via CCB_ASKD_AUTOSTART=1).", file=sys.stderr)
return EXIT_ERROR

# Use CCB_RUN_DIR (set by CCB startup) to locate the state file.
# This already contains the correct project-specific path.
state_file = state_file_path("askd.json")

state = askd_rpc.read_state(state_file)
if not state:
print("[ERROR] Unified askd daemon not running", file=sys.stderr)
return EXIT_ERROR
# Try to start daemon and retry once
if _maybe_start_unified_daemon():
state = askd_rpc.read_state(state_file)
if not state:
print("[ERROR] Unified askd daemon not running", file=sys.stderr)
return EXIT_ERROR

host = state.get("connect_host") or state.get("host") or "127.0.0.1"
port = int(state.get("port") or 0)
Expand Down Expand Up @@ -164,31 +218,74 @@ def _send_via_unified_daemon(
req["email_msg_id"] = os.environ.get("CCB_EMAIL_MSG_ID", "")
req["email_from"] = os.environ.get("CCB_EMAIL_FROM", "")

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout + 10 if timeout > 0 else 3610)
sock.connect((host, port))
sock.sendall((json.dumps(req) + "\n").encode("utf-8"))

data = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
data += chunk
if b"\n" in data:
break

sock.close()
resp = json.loads(data.decode("utf-8").strip())
exit_code = int(resp.get("exit_code") or 0)
reply = resp.get("reply") or ""
if reply:
print(reply)
return exit_code
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR
# Try to send request, with one retry on connection failure only
request_sent = False
for attempt in range(2):
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout + 10 if timeout > 0 else 3610)
sock.connect((host, port))

# Mark that connection succeeded - no retry after this point
request_sent = True

sock.sendall((json.dumps(req) + "\n").encode("utf-8"))

data = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
data += chunk
if b"\n" in data:
break

resp = json.loads(data.decode("utf-8").strip())
exit_code = int(resp.get("exit_code") or 0)
reply = resp.get("reply") or ""
if reply:
print(reply)
return exit_code
except (ConnectionRefusedError, ConnectionResetError) as e:
# Only retry if connection failed before request was sent
if attempt == 0 and not request_sent:
if _maybe_start_unified_daemon():
# Re-read state for new daemon
state = askd_rpc.read_state(state_file)
if state:
host = state.get("connect_host") or state.get("host") or "127.0.0.1"
port = int(state.get("port") or 0)
token = state.get("token") or ""
req["token"] = token
continue # Retry with new connection info
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR
except OSError as e:
# For other OS errors, only retry if connection not yet established
if attempt == 0 and not request_sent:
if _maybe_start_unified_daemon():
state = askd_rpc.read_state(state_file)
if state:
host = state.get("connect_host") or state.get("host") or "127.0.0.1"
port = int(state.get("port") or 0)
token = state.get("token") or ""
req["token"] = token
continue
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR
finally:
# Always close socket to prevent leaks
if sock:
try:
sock.close()
except Exception:
pass

return EXIT_ERROR


def _env_bool(name: str, default: bool = False) -> bool:
Expand All @@ -198,96 +295,6 @@ def _env_bool(name: str, default: bool = False) -> bool:
return val not in ("0", "false", "no", "off")


def _is_pid_alive(pid: int) -> bool:
if pid <= 0:
return False
try:
os.kill(pid, 0)
return True
except OSError:
return False
except Exception:
return True


def _askd_start_argv() -> list[str] | None:
local = script_dir / "askd"
candidates: list[str] = []
if local.exists():
candidates.append(str(local))
found = shutil.which("askd")
if found:
candidates.append(found)
if not candidates:
return None

entry = candidates[0]
lower = entry.lower()
if lower.endswith((".cmd", ".bat", ".exe")):
return [entry]
return [sys.executable, entry]


def _ensure_unified_daemon_ready(timeout_s: float = 2.0) -> bool:
if not _use_unified_daemon():
return True

from askd_runtime import state_file_path
import askd_rpc

state_file = state_file_path("askd.json")
try:
if askd_rpc.ping_daemon("ask", 0.2, state_file):
return True
except Exception:
pass

if not _env_bool("CCB_ASKD_AUTOSTART", True):
return False

argv = _askd_start_argv()
if not argv:
return False

env = os.environ.copy()
parent_raw = (env.get("CCB_PARENT_PID") or "").strip()
if parent_raw:
try:
parent_pid = int(parent_raw)
except Exception:
parent_pid = 0
if parent_pid <= 0 or not _is_pid_alive(parent_pid):
env.pop("CCB_PARENT_PID", None)
env.pop("CCB_MANAGED", None)

kwargs = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
"close_fds": True,
"env": env,
}
if os.name == "nt":
kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
else:
kwargs["start_new_session"] = True

try:
subprocess.Popen(argv, **kwargs)
except Exception:
return False

deadline = time.time() + max(0.2, float(timeout_s))
while time.time() < deadline:
try:
if askd_rpc.ping_daemon("ask", 0.2, state_file):
return True
except Exception:
pass
time.sleep(0.1)
return False


def _default_foreground() -> bool:
# Allow explicit override
if _env_bool("CCB_ASK_BACKGROUND", False):
Expand Down Expand Up @@ -394,10 +401,15 @@ def main(argv: list[str]) -> int:
return EXIT_ERROR

# Notify mode: sync send, no wait for reply (used for hook notifications)
# MUST be checked before unified daemon path to avoid full request-response cycle
# which would cause reply-to-self loops via notify_completion -> ask -> daemon -> notify_completion
if notify_mode:
_require_caller()
if _use_unified_daemon():
# TODO: Add fire-and-forget RPC mode for unified daemon
# For now, disable unified mode for notify and use legacy path
print("[WARN] Notify mode not yet supported with unified daemon, using legacy", file=sys.stderr)
# Fall through to legacy path below

# Legacy daemon path for notify mode
cmd = [daemon_cmd, "--sync"]
if no_wrap:
cmd.append("--no-wrap")
Expand All @@ -416,33 +428,25 @@ def main(argv: list[str]) -> int:
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR

# Use unified daemon if enabled (default: True)
if _use_unified_daemon():
caller = _require_caller()
return _send_via_unified_daemon(provider, message, timeout, no_wrap, caller)

# Foreground mode: run provider directly (avoid background cleanup in managed envs)
# Foreground mode: run provider directly via unified daemon
if foreground_mode:
cmd = [daemon_cmd, "--sync", "--timeout", str(timeout)]
if no_wrap and provider == "claude":
cmd.append("--no-wrap")
env = os.environ.copy()
env["CCB_CALLER"] = _require_caller()
try:
result = subprocess.run(cmd, input=message, text=True, env=env)
return result.returncode
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR

# Default async mode: background task via nohup, using unified askd daemon
if _use_unified_daemon():
ready_timeout = min(timeout, 2.0) if timeout and timeout > 0 else 2.0
if not _ensure_unified_daemon_ready(timeout_s=ready_timeout):
print("[ERROR] Unified askd daemon not running", file=sys.stderr)
print("Start it with `askd` (or enable autostart via CCB_ASKD_AUTOSTART=1).", file=sys.stderr)
return EXIT_ERROR
if _use_unified_daemon():
caller = _require_caller()
return _send_via_unified_daemon(provider, message, timeout, no_wrap, caller)
else:
cmd = [daemon_cmd, "--sync", "--timeout", str(timeout)]
if no_wrap and provider == "claude":
cmd.append("--no-wrap")
env = os.environ.copy()
env["CCB_CALLER"] = _require_caller()
try:
result = subprocess.run(cmd, input=message, text=True, env=env)
return result.returncode
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return EXIT_ERROR

# Default async mode: background task via nohup
task_id = make_task_id()
log_dir = Path(tempfile.gettempdir()) / "ccb-tasks"
log_dir.mkdir(parents=True, exist_ok=True)
Expand Down
Loading
Loading