diff --git a/agent.py b/agent.py index 7d904736..add91b7e 100644 --- a/agent.py +++ b/agent.py @@ -7,6 +7,7 @@ import asyncio import io +import os import re import sys from datetime import datetime, timedelta @@ -16,6 +17,8 @@ from claude_agent_sdk import ClaudeSDKClient +from structured_logging import get_logger + # Fix Windows console encoding for Unicode characters (emoji, etc.) # Without this, print() crashes when Claude outputs emoji like ✅ if sys.platform == "win32": @@ -40,6 +43,7 @@ async def run_agent_session( client: ClaudeSDKClient, message: str, project_dir: Path, + logger=None, ) -> tuple[str, str]: """ Run a single agent session using Claude Agent SDK. @@ -48,6 +52,7 @@ async def run_agent_session( client: Claude SDK client message: The prompt to send project_dir: Project directory path + logger: Optional structured logger for this session Returns: (status, response_text) where status is: @@ -55,6 +60,8 @@ async def run_agent_session( - "error" if an error occurred """ print("Sending prompt to Claude Agent SDK...\n") + if logger: + logger.info("Starting agent session", prompt_length=len(message)) try: # Send the query @@ -81,6 +88,8 @@ async def run_agent_session( print(f" Input: {input_str[:200]}...", flush=True) else: print(f" Input: {input_str}", flush=True) + if logger: + logger.debug("Tool used", tool_name=block.name, input_size=len(str(getattr(block, "input", "")))) # Handle UserMessage (tool results) elif msg_type == "UserMessage" and hasattr(msg, "content"): @@ -94,20 +103,29 @@ async def run_agent_session( # Check if command was blocked by security hook if "blocked" in str(result_content).lower(): print(f" [BLOCKED] {result_content}", flush=True) + if logger: + logger.error("Security: command blocked", content=str(result_content)[:200]) elif is_error: # Show errors (truncated) error_str = str(result_content)[:500] print(f" [Error] {error_str}", flush=True) + if logger: + logger.error("Tool execution error", error=error_str[:200]) else: # Tool succeeded - just show brief confirmation print(" [Done]", flush=True) print("\n" + "-" * 70 + "\n") + if logger: + logger.info("Agent session completed", response_length=len(response_text)) return "continue", response_text except Exception as e: - print(f"Error during agent session: {e}") - return "error", str(e) + error_str = str(e) + print(f"Error during agent session: {error_str}") + if logger: + logger.error("Agent session error", error_type=type(e).__name__, message=error_str[:200]) + return "error", error_str async def run_autonomous_agent( @@ -131,6 +149,27 @@ async def run_autonomous_agent( agent_type: Type of agent: "initializer", "coding", "testing", or None (auto-detect) testing_feature_id: For testing agents, the pre-claimed feature ID to test """ + # Initialize structured logger for this agent session + # Agent ID format: "initializer", "coding-", "testing-" + if agent_type == "testing": + agent_id = f"testing-{os.getpid()}" + elif feature_id: + agent_id = f"coding-{feature_id}" + elif agent_type == "initializer": + agent_id = "initializer" + else: + agent_id = "coding-main" + + logger = get_logger(project_dir, agent_id=agent_id, console_output=False) + logger.info( + "Autonomous agent started", + agent_type=agent_type or "auto-detect", + model=model, + yolo_mode=yolo_mode, + max_iterations=max_iterations, + feature_id=feature_id, + ) + print("\n" + "=" * 70) print(" AUTONOMOUS CODING AGENT") print("=" * 70) @@ -192,6 +231,7 @@ async def run_autonomous_agent( if not is_initializer and iteration == 1: passing, in_progress, total = count_passing_tests(project_dir) if total > 0 and passing == total: + logger.info("Project complete on startup", passing=passing, total=total) print("\n" + "=" * 70) print(" ALL FEATURES ALREADY COMPLETE!") print("=" * 70) @@ -208,15 +248,14 @@ async def run_autonomous_agent( print_session_header(iteration, is_initializer) # Create client (fresh context) - # Pass agent_id for browser isolation in multi-agent scenarios - import os + # Pass client_agent_id for browser isolation in multi-agent scenarios if agent_type == "testing": - agent_id = f"testing-{os.getpid()}" # Unique ID for testing agents + client_agent_id = f"testing-{os.getpid()}" # Unique ID for testing agents elif feature_id: - agent_id = f"feature-{feature_id}" + client_agent_id = f"feature-{feature_id}" else: - agent_id = None - client = create_client(project_dir, model, yolo_mode=yolo_mode, agent_id=agent_id) + client_agent_id = None + client = create_client(project_dir, model, yolo_mode=yolo_mode, agent_id=client_agent_id) # Choose prompt based on agent type if agent_type == "initializer": @@ -234,9 +273,10 @@ async def run_autonomous_agent( # Wrap in try/except to handle MCP server startup failures gracefully try: async with client: - status, response = await run_agent_session(client, prompt, project_dir) + status, response = await run_agent_session(client, prompt, project_dir, logger=logger) except Exception as e: print(f"Client/MCP server error: {e}") + logger.error("Client/MCP server error", error_type=type(e).__name__, message=str(e)[:200]) # Don't crash - return error status so the loop can retry status, response = "error", str(e) @@ -255,6 +295,7 @@ async def run_autonomous_agent( if "limit reached" in response.lower(): print("Claude Agent SDK indicated limit reached.") + logger.warn("Rate limit signal in response") # Try to parse reset time from response match = re.search( @@ -329,6 +370,7 @@ async def run_autonomous_agent( elif status == "error": print("\nSession encountered an error") print("Will retry with a fresh session...") + logger.error("Session error, retrying", delay_seconds=AUTO_CONTINUE_DELAY_SECONDS) await asyncio.sleep(AUTO_CONTINUE_DELAY_SECONDS) # Small delay between sessions @@ -337,6 +379,14 @@ async def run_autonomous_agent( await asyncio.sleep(1) # Final summary + passing, in_progress, total = count_passing_tests(project_dir) + logger.info( + "Agent session complete", + iterations=iteration, + passing=passing, + in_progress=in_progress, + total=total, + ) print("\n" + "=" * 70) print(" SESSION COMPLETE") print("=" * 70) diff --git a/autonomous_agent_demo.py b/autonomous_agent_demo.py index 16702f5e..702f696c 100644 --- a/autonomous_agent_demo.py +++ b/autonomous_agent_demo.py @@ -46,6 +46,7 @@ from agent import run_autonomous_agent from registry import DEFAULT_MODEL, get_project_path +from structured_logging import get_logger def parse_args() -> argparse.Namespace: @@ -193,6 +194,17 @@ def main() -> None: print("Use an absolute path or register the project first.") return + # Initialize logger now that project_dir is resolved + logger = get_logger(project_dir, agent_id="entry-point", console_output=False) + logger.info( + "Script started", + input_path=project_dir_input, + resolved_path=str(project_dir), + agent_type=args.agent_type, + concurrency=args.concurrency, + yolo_mode=args.yolo, + ) + try: if args.agent_type: # Subprocess mode - spawned by orchestrator for a specific role @@ -228,8 +240,10 @@ def main() -> None: except KeyboardInterrupt: print("\n\nInterrupted by user") print("To resume, run the same command again") + logger.info("Interrupted by user") except Exception as e: print(f"\nFatal error: {e}") + logger.error("Fatal error", error_type=type(e).__name__, message=str(e)[:200]) raise diff --git a/client.py b/client.py index 7ea04a5e..7e775807 100644 --- a/client.py +++ b/client.py @@ -16,6 +16,7 @@ from dotenv import load_dotenv from security import bash_security_hook +from structured_logging import get_logger # Load environment variables from .env file if present load_dotenv() @@ -179,6 +180,9 @@ def create_client( Note: Authentication is handled by start.bat/start.sh before this runs. The Claude SDK auto-detects credentials from the Claude CLI configuration """ + # Initialize logger for client configuration events + logger = get_logger(project_dir, agent_id="client", console_output=False) + # Build allowed tools list based on mode # In YOLO mode, exclude Playwright tools for faster prototyping allowed_tools = [*BUILTIN_TOOLS, *FEATURE_MCP_TOOLS] @@ -225,6 +229,7 @@ def create_client( with open(settings_file, "w") as f: json.dump(security_settings, f, indent=2) + logger.info("Settings file written", file_path=str(settings_file)) print(f"Created security settings at {settings_file}") print(" - Sandbox enabled (OS-level bash isolation)") print(f" - Filesystem restricted to: {project_dir.resolve()}") @@ -300,6 +305,7 @@ def create_client( if sdk_env: print(f" - API overrides: {', '.join(sdk_env.keys())}") + logger.info("API overrides configured", is_ollama=is_ollama, overrides=list(sdk_env.keys())) if is_ollama: print(" - Ollama Mode: Using local models") elif "ANTHROPIC_BASE_URL" in sdk_env: @@ -352,6 +358,16 @@ async def pre_compact_hook( # } return SyncHookJSONOutput() + # Log client creation + logger.info( + "Client created", + model=model, + yolo_mode=yolo_mode, + agent_id=agent_id, + is_alternative_api=is_alternative_api, + max_turns=1000, + ) + return ClaudeSDKClient( options=ClaudeAgentOptions( model=model, diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 486b9635..fc47d214 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -31,6 +31,7 @@ from api.dependency_resolver import are_dependencies_satisfied, compute_scheduling_scores from progress import has_features from server.utils.process_utils import kill_process_tree +from structured_logging import get_logger # Root directory of autocoder (where this script and autonomous_agent_demo.py live) AUTOCODER_ROOT = Path(__file__).parent.resolve() @@ -192,6 +193,16 @@ def __init__( # Database session for this orchestrator self._engine, self._session_maker = create_database(project_dir) + # Structured logger for persistent logs (saved to {project_dir}/.autocoder/logs.db) + # Uses console_output=False since orchestrator already has its own print statements + self._logger = get_logger(project_dir, agent_id="orchestrator", console_output=False) + self._logger.info( + "Orchestrator initialized", + max_concurrency=self.max_concurrency, + yolo_mode=yolo_mode, + testing_agent_ratio=testing_agent_ratio, + ) + def get_session(self): """Get a new database session.""" return self._session_maker() @@ -514,6 +525,7 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: ) except Exception as e: # Reset in_progress on failure + self._logger.error("Spawn coding agent failed", feature_id=feature_id, error=str(e)[:200]) session = self.get_session() try: feature = session.query(Feature).filter(Feature.id == feature_id).first() @@ -539,6 +551,7 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: self.on_status(feature_id, "running") print(f"Started coding agent for feature #{feature_id}", flush=True) + self._logger.info("Spawned coding agent", feature_id=feature_id, pid=proc.pid) return True, f"Started feature {feature_id}" def _spawn_testing_agent(self) -> tuple[bool, str]: @@ -788,9 +801,11 @@ def _on_agent_complete( return # Coding agent completion + agent_status = "success" if return_code == 0 else "failed" debug_log.log("COMPLETE", f"Coding agent for feature #{feature_id} finished", return_code=return_code, - status="success" if return_code == 0 else "failed") + status=agent_status) + self._logger.info("Coding agent completed", feature_id=feature_id, status=agent_status, return_code=return_code) with self._lock: self.running_coding_agents.pop(feature_id, None) @@ -826,6 +841,7 @@ def _on_agent_complete( print(f"Feature #{feature_id} has failed {failure_count} times, will not retry", flush=True) debug_log.log("COMPLETE", f"Feature #{feature_id} exceeded max retries", failure_count=failure_count) + self._logger.warn("Feature exceeded max retries", feature_id=feature_id, failure_count=failure_count) status = "completed" if return_code == 0 else "failed" if self.on_status: @@ -1102,6 +1118,7 @@ async def run_loop(self): except Exception as e: print(f"Orchestrator error: {e}", flush=True) + self._logger.error("Orchestrator loop error", error_type=type(e).__name__, message=str(e)[:200]) await self._wait_for_agent_completion() # Wait for remaining agents to complete diff --git a/structured_logging.py b/structured_logging.py new file mode 100644 index 00000000..1925a4c5 --- /dev/null +++ b/structured_logging.py @@ -0,0 +1,664 @@ +""" +Structured Logging Module +========================= + +Enhanced logging with structured JSON format, filtering, and export capabilities. + +Features: +- JSON-formatted logs with consistent schema +- Filter by agent, feature, level +- Full-text search +- Timeline view for agent activity +- Export logs for offline analysis + +Log Format: +{ + "timestamp": "2025-01-21T10:30:00.000Z", + "level": "info|warn|error", + "agent_id": "coding-42", + "feature_id": 42, + "tool_name": "feature_mark_passing", + "duration_ms": 150, + "message": "Feature marked as passing" +} +""" + +import hashlib +import json +import logging +import sqlite3 +import threading +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Literal, Optional + +# Type aliases +# Note: Python's logging uses "warning" but we normalize to "warn" for consistency +LogLevel = Literal["debug", "info", "warn", "warning", "error"] + + +def _format_ts(dt: datetime) -> str: + """ + Format a datetime to ISO 8601 string with UTC "Z" suffix. + + Ensures timezone-aware datetimes are converted to UTC first. + Naive datetimes are assumed to be UTC and get tzinfo set. + + Args: + dt: Datetime to format + + Returns: + ISO 8601 string with "Z" suffix (e.g., "2025-01-21T10:30:00.000Z") + """ + if dt.tzinfo is None: + # Treat naive datetimes as UTC + dt = dt.replace(tzinfo=timezone.utc) + else: + # Convert to UTC if timezone-aware + dt = dt.astimezone(timezone.utc) + return dt.isoformat().replace("+00:00", "Z") + + +@dataclass +class StructuredLogEntry: + """A structured log entry with all metadata.""" + + timestamp: str + level: LogLevel + message: str + agent_id: Optional[str] = None + feature_id: Optional[int] = None + tool_name: Optional[str] = None + duration_ms: Optional[int] = None + extra: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary, excluding None values.""" + result = { + "timestamp": self.timestamp, + "level": self.level, + "message": self.message, + } + if self.agent_id: + result["agent_id"] = self.agent_id + if self.feature_id is not None: + result["feature_id"] = self.feature_id + if self.tool_name: + result["tool_name"] = self.tool_name + if self.duration_ms is not None: + result["duration_ms"] = self.duration_ms + if self.extra: + result["extra"] = self.extra + return result + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict()) + + +class StructuredLogHandler(logging.Handler): + """ + Custom logging handler that stores structured logs in SQLite. + + Thread-safe for concurrent agent logging. + """ + + def __init__( + self, + db_path: Path, + agent_id: Optional[str] = None, + max_entries: int = 10000, + ): + super().__init__() + self.db_path = db_path + self.agent_id = agent_id + self.max_entries = max_entries + self._lock = threading.Lock() + self._insert_count = 0 + self._cleanup_interval = 100 # Run cleanup every N inserts + self._init_database() + + def _init_database(self) -> None: + """Initialize the SQLite database for logs.""" + with self._lock: + # Use context manager to ensure connection is closed on errors + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Enable WAL mode for better concurrency with parallel agents + # WAL allows readers and writers to work concurrently without blocking + cursor.execute("PRAGMA journal_mode=WAL") + + # Create logs table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + agent_id TEXT, + feature_id INTEGER, + tool_name TEXT, + duration_ms INTEGER, + extra TEXT + ) + """) + + # Create indexes for common queries + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_logs_timestamp + ON logs(timestamp) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_logs_level + ON logs(level) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_logs_agent_id + ON logs(agent_id) + """) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_logs_feature_id + ON logs(feature_id) + """) + + conn.commit() + + def emit(self, record: logging.LogRecord) -> None: + """Store a log record in the database.""" + try: + # Extract structured data from record + # Normalize "warning" -> "warn" for consistency + level = record.levelname.lower() + if level == "warning": + level = "warn" + entry = StructuredLogEntry( + timestamp=_format_ts(datetime.now(timezone.utc)), + level=level, + message=self.format(record), + agent_id=getattr(record, "agent_id", self.agent_id), + feature_id=getattr(record, "feature_id", None), + tool_name=getattr(record, "tool_name", None), + duration_ms=getattr(record, "duration_ms", None), + extra=getattr(record, "extra", {}), + ) + + with self._lock: + # Use context manager to ensure connection is closed on errors + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + cursor.execute( + """ + INSERT INTO logs + (timestamp, level, message, agent_id, feature_id, tool_name, duration_ms, extra) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + entry.timestamp, + entry.level, + entry.message, + entry.agent_id, + entry.feature_id, + entry.tool_name, + entry.duration_ms, + json.dumps(entry.extra) if entry.extra else None, + ), + ) + + # Cleanup old entries periodically (not on every insert) + self._insert_count += 1 + if self._insert_count >= self._cleanup_interval: + self._insert_count = 0 + cursor.execute("SELECT COUNT(*) FROM logs") + count = cursor.fetchone()[0] + if count > self.max_entries: + delete_count = count - self.max_entries + cursor.execute( + """ + DELETE FROM logs WHERE id IN ( + SELECT id FROM logs ORDER BY timestamp ASC LIMIT ? + ) + """, + (delete_count,), + ) + + conn.commit() + + except Exception: + self.handleError(record) + + +class StructuredLogger: + """ + Enhanced logger with structured logging capabilities. + + Usage: + logger = StructuredLogger(project_dir, agent_id="coding-1") + logger.info("Starting feature", feature_id=42) + logger.error("Test failed", feature_id=42, tool_name="playwright") + """ + + def __init__( + self, + project_dir: Path, + agent_id: Optional[str] = None, + console_output: bool = True, + ): + self.project_dir = Path(project_dir) + self.agent_id = agent_id + self.db_path = self.project_dir / ".autocoder" / "logs.db" + + # Ensure directory exists + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + # Setup logger with unique name per instance to avoid handler accumulation + # across tests and multiple invocations. Include project path hash for uniqueness. + path_hash = hashlib.md5(str(self.project_dir).encode()).hexdigest()[:8] + logger_name = f"autocoder.{agent_id or 'main'}.{path_hash}.{id(self)}" + self.logger = logging.getLogger(logger_name) + self.logger.setLevel(logging.DEBUG) + + # Clear existing handlers (for safety, though names should be unique) + self.logger.handlers.clear() + + # Add structured handler + self.handler = StructuredLogHandler(self.db_path, agent_id) + self.handler.setFormatter(logging.Formatter("%(message)s")) + self.logger.addHandler(self.handler) + + # Add console handler if requested + if console_output: + console = logging.StreamHandler() + console.setLevel(logging.INFO) + console.setFormatter( + logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") + ) + self.logger.addHandler(console) + + def _log( + self, + level: str, + message: str, + feature_id: Optional[int] = None, + tool_name: Optional[str] = None, + duration_ms: Optional[int] = None, + **extra, + ) -> None: + """Internal logging method with structured data.""" + record_extra = { + "agent_id": self.agent_id, + "feature_id": feature_id, + "tool_name": tool_name, + "duration_ms": duration_ms, + "extra": extra, + } + + # Use LogRecord extras + getattr(self.logger, level)( + message, + extra=record_extra, + ) + + def debug(self, message: str, **kwargs) -> None: + """Log debug message.""" + self._log("debug", message, **kwargs) + + def info(self, message: str, **kwargs) -> None: + """Log info message.""" + self._log("info", message, **kwargs) + + def warn(self, message: str, **kwargs) -> None: + """Log warning message.""" + self._log("warning", message, **kwargs) + + def warning(self, message: str, **kwargs) -> None: + """Log warning message (alias).""" + self._log("warning", message, **kwargs) + + def error(self, message: str, **kwargs) -> None: + """Log error message.""" + self._log("error", message, **kwargs) + + +class LogQuery: + """ + Query interface for structured logs. + + Supports filtering, searching, and aggregation. + """ + + def __init__(self, db_path: Path): + self.db_path = db_path + + def _connect(self) -> sqlite3.Connection: + """Get database connection.""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def query( + self, + level: Optional[LogLevel] = None, + agent_id: Optional[str] = None, + feature_id: Optional[int] = None, + tool_name: Optional[str] = None, + search: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """ + Query logs with filters. + + Args: + level: Filter by log level + agent_id: Filter by agent ID + feature_id: Filter by feature ID + tool_name: Filter by tool name + search: Full-text search in message + since: Start datetime + until: End datetime + limit: Max results + offset: Pagination offset + + Returns: + List of log entries as dicts + """ + with self._connect() as conn: + cursor = conn.cursor() + + conditions = [] + params = [] + + if level: + conditions.append("level = ?") + params.append(level) + + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + + if feature_id is not None: + conditions.append("feature_id = ?") + params.append(feature_id) + + if tool_name: + conditions.append("tool_name = ?") + params.append(tool_name) + + if search: + conditions.append("message LIKE ? ESCAPE '\\'") + # Escape LIKE wildcards to prevent unexpected query behavior + # Escape backslash FIRST, then LIKE wildcards + escaped_search = search.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + params.append(f"%{escaped_search}%") + + if since: + conditions.append("timestamp >= ?") + params.append(_format_ts(since)) + + if until: + conditions.append("timestamp <= ?") + params.append(_format_ts(until)) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + query = f""" + SELECT * FROM logs + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + """ + params.extend([limit, offset]) + + cursor.execute(query, params) + rows = cursor.fetchall() + + return [dict(row) for row in rows] + + def count( + self, + level: Optional[LogLevel] = None, + agent_id: Optional[str] = None, + feature_id: Optional[int] = None, + since: Optional[datetime] = None, + ) -> int: + """Count logs matching filters.""" + with self._connect() as conn: + cursor = conn.cursor() + + conditions = [] + params = [] + + if level: + conditions.append("level = ?") + params.append(level) + if agent_id: + conditions.append("agent_id = ?") + params.append(agent_id) + if feature_id is not None: + conditions.append("feature_id = ?") + params.append(feature_id) + if since: + conditions.append("timestamp >= ?") + params.append(_format_ts(since)) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + cursor.execute(f"SELECT COUNT(*) FROM logs WHERE {where_clause}", params) + return cursor.fetchone()[0] + + def get_timeline( + self, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + bucket_minutes: int = 5, + ) -> list[dict]: + """ + Get activity timeline bucketed by time intervals. + + Returns list of buckets with counts per agent. + """ + # Default to last 24 hours + if not since: + since = datetime.now(timezone.utc) - timedelta(hours=24) + if not until: + until = datetime.now(timezone.utc) + + with self._connect() as conn: + cursor = conn.cursor() + + cursor.execute( + """ + SELECT + strftime('%Y-%m-%d %H:', timestamp) || + printf('%02d', (CAST(strftime('%M', timestamp) AS INTEGER) / ?) * ?) || ':00' as bucket, + agent_id, + COUNT(*) as count, + SUM(CASE WHEN level = 'error' THEN 1 ELSE 0 END) as errors + FROM logs + WHERE timestamp >= ? AND timestamp <= ? + GROUP BY bucket, agent_id + ORDER BY bucket + """, + (bucket_minutes, bucket_minutes, _format_ts(since), _format_ts(until)), + ) + + rows = cursor.fetchall() + + # Group by bucket + buckets = {} + for row in rows: + bucket = row["bucket"] + if bucket not in buckets: + buckets[bucket] = {"timestamp": bucket, "agents": {}, "total": 0, "errors": 0} + agent = row["agent_id"] or "main" + buckets[bucket]["agents"][agent] = row["count"] + buckets[bucket]["total"] += row["count"] + buckets[bucket]["errors"] += row["errors"] + + return list(buckets.values()) + + def get_agent_stats(self, since: Optional[datetime] = None) -> list[dict]: + """Get log statistics per agent.""" + params = [] + where_clause = "1=1" + if since: + where_clause = "timestamp >= ?" + params.append(_format_ts(since)) + + with self._connect() as conn: + cursor = conn.cursor() + + cursor.execute( + f""" + SELECT + agent_id, + COUNT(*) as total, + SUM(CASE WHEN level = 'info' THEN 1 ELSE 0 END) as info_count, + SUM(CASE WHEN level = 'warn' OR level = 'warning' THEN 1 ELSE 0 END) as warn_count, + SUM(CASE WHEN level = 'error' THEN 1 ELSE 0 END) as error_count, + MIN(timestamp) as first_log, + MAX(timestamp) as last_log + FROM logs + WHERE {where_clause} + GROUP BY agent_id + ORDER BY total DESC + """, + params, + ) + + rows = cursor.fetchall() + + # Normalize timestamps to use "Z" suffix for UTC consistency + results = [dict(row) for row in rows] + for entry in results: + if entry.get("first_log"): + entry["first_log"] = entry["first_log"].replace("+00:00", "Z") + if entry.get("last_log"): + entry["last_log"] = entry["last_log"].replace("+00:00", "Z") + + return results + + def _iter_logs( + self, + batch_size: int = 1000, + **filters, + ): + """ + Iterate over logs in batches using cursor-based pagination. + + This avoids loading all logs into memory at once. + + Args: + batch_size: Number of rows to fetch per batch + **filters: Query filters passed to query() + + Yields: + Log entries as dicts + """ + offset = 0 + while True: + batch = self.query(limit=batch_size, offset=offset, **filters) + if not batch: + break + yield from batch + offset += len(batch) + # If we got fewer than batch_size, we've reached the end + if len(batch) < batch_size: + break + + def export_logs( + self, + output_path: Path, + output_format: Literal["json", "jsonl", "csv"] = "jsonl", + batch_size: int = 1000, + **filters, + ) -> int: + """ + Export logs to file using cursor-based streaming. + + Args: + output_path: Output file path + output_format: Export format (json, jsonl, csv) + batch_size: Number of rows to fetch per batch (default 1000) + **filters: Query filters + + Returns: + Number of exported entries + """ + import csv + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + count = 0 + + if output_format == "json": + # For JSON format, we still need to collect all to produce valid JSON + # but we stream to avoid massive single query + with open(output_path, "w") as f: + f.write("[\n") + first = True + for log in self._iter_logs(batch_size=batch_size, **filters): + if not first: + f.write(",\n") + f.write(" " + json.dumps(log)) + first = False + count += 1 + f.write("\n]") + + elif output_format == "jsonl": + with open(output_path, "w") as f: + for log in self._iter_logs(batch_size=batch_size, **filters): + f.write(json.dumps(log) + "\n") + count += 1 + + elif output_format == "csv": + fieldnames = None + with open(output_path, "w", newline="") as f: + writer = None + for log in self._iter_logs(batch_size=batch_size, **filters): + if writer is None: + fieldnames = list(log.keys()) + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerow(log) + count += 1 + + return count + + +def get_logger( + project_dir: Path, + agent_id: Optional[str] = None, + console_output: bool = True, +) -> StructuredLogger: + """ + Get or create a structured logger for a project. + + Args: + project_dir: Project directory + agent_id: Agent identifier (e.g., "coding-1", "initializer") + console_output: Whether to also log to console + + Returns: + StructuredLogger instance + """ + return StructuredLogger(project_dir, agent_id, console_output) + + +def get_log_query(project_dir: Path) -> LogQuery: + """ + Get log query interface for a project. + + Args: + project_dir: Project directory + + Returns: + LogQuery instance + """ + db_path = Path(project_dir) / ".autocoder" / "logs.db" + return LogQuery(db_path) diff --git a/test_structured_logging.py b/test_structured_logging.py new file mode 100644 index 00000000..d04ba4da --- /dev/null +++ b/test_structured_logging.py @@ -0,0 +1,470 @@ +""" +Unit Tests for Structured Logging Module +========================================= + +Tests for the structured logging system that saves logs to SQLite. +""" + +import json +import shutil +import sqlite3 +import tempfile +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from unittest import TestCase + +from structured_logging import ( + StructuredLogEntry, + StructuredLogHandler, + get_log_query, + get_logger, +) + + +class TestStructuredLogEntry(TestCase): + """Tests for StructuredLogEntry dataclass.""" + + def test_to_dict_minimal(self): + """Test minimal entry conversion.""" + entry = StructuredLogEntry( + timestamp="2025-01-21T10:30:00.000Z", + level="info", + message="Test message", + ) + result = entry.to_dict() + self.assertEqual(result["timestamp"], "2025-01-21T10:30:00.000Z") + self.assertEqual(result["level"], "info") + self.assertEqual(result["message"], "Test message") + # Optional fields should not be present when None + self.assertNotIn("agent_id", result) + self.assertNotIn("feature_id", result) + self.assertNotIn("tool_name", result) + + def test_to_dict_full(self): + """Test full entry with all fields.""" + entry = StructuredLogEntry( + timestamp="2025-01-21T10:30:00.000Z", + level="error", + message="Test error", + agent_id="coding-42", + feature_id=42, + tool_name="playwright", + duration_ms=150, + extra={"key": "value"}, + ) + result = entry.to_dict() + self.assertEqual(result["agent_id"], "coding-42") + self.assertEqual(result["feature_id"], 42) + self.assertEqual(result["tool_name"], "playwright") + self.assertEqual(result["duration_ms"], 150) + self.assertEqual(result["extra"], {"key": "value"}) + + def test_to_json(self): + """Test JSON serialization.""" + entry = StructuredLogEntry( + timestamp="2025-01-21T10:30:00.000Z", + level="info", + message="Test", + ) + json_str = entry.to_json() + parsed = json.loads(json_str) + self.assertEqual(parsed["message"], "Test") + + +class TestStructuredLogHandler(TestCase): + """Tests for StructuredLogHandler.""" + + def setUp(self): + """Create temporary directory for tests.""" + self.temp_dir = tempfile.mkdtemp() + self.db_path = Path(self.temp_dir) / "logs.db" + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_creates_database(self): + """Test that handler creates database file.""" + _handler = StructuredLogHandler(self.db_path) # noqa: F841 - handler triggers DB creation + self.assertTrue(self.db_path.exists()) + + def test_creates_tables(self): + """Test that handler creates logs table.""" + _handler = StructuredLogHandler(self.db_path) # noqa: F841 - handler triggers table creation + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='logs'") + result = cursor.fetchone() + conn.close() + self.assertIsNotNone(result) + + def test_wal_mode_enabled(self): + """Test that WAL mode is enabled for concurrency.""" + _handler = StructuredLogHandler(self.db_path) # noqa: F841 - handler triggers WAL mode + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + cursor.execute("PRAGMA journal_mode") + result = cursor.fetchone()[0] + conn.close() + self.assertEqual(result.lower(), "wal") + + +class TestStructuredLogger(TestCase): + """Tests for StructuredLogger.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_creates_logs_directory(self): + """Test that logger creates .autocoder directory.""" + _logger = get_logger(self.project_dir, agent_id="test", console_output=False) # noqa: F841 + autocoder_dir = self.project_dir / ".autocoder" + self.assertTrue(autocoder_dir.exists()) + + def test_creates_logs_db(self): + """Test that logger creates logs.db file.""" + _logger = get_logger(self.project_dir, agent_id="test", console_output=False) # noqa: F841 + db_path = self.project_dir / ".autocoder" / "logs.db" + self.assertTrue(db_path.exists()) + + def test_log_info(self): + """Test info level logging.""" + logger = get_logger(self.project_dir, agent_id="test-agent", console_output=False) + logger.info("Test info message", feature_id=42) + + # Query the database + query = get_log_query(self.project_dir) + logs = query.query(level="info") + self.assertEqual(len(logs), 1) + self.assertEqual(logs[0]["message"], "Test info message") + self.assertEqual(logs[0]["agent_id"], "test-agent") + self.assertEqual(logs[0]["feature_id"], 42) + + def test_log_warn(self): + """Test warning level logging.""" + logger = get_logger(self.project_dir, agent_id="test", console_output=False) + logger.warn("Test warning") + + query = get_log_query(self.project_dir) + # Level is normalized to "warn" when stored (from Python's "warning") + logs = query.query(level="warn") + self.assertEqual(len(logs), 1) + # Assert on level field, not message content (more robust) + self.assertEqual(logs[0]["level"], "warn") + + def test_log_error(self): + """Test error level logging.""" + logger = get_logger(self.project_dir, agent_id="test", console_output=False) + logger.error("Test error", tool_name="playwright") + + query = get_log_query(self.project_dir) + logs = query.query(level="error") + self.assertEqual(len(logs), 1) + self.assertEqual(logs[0]["tool_name"], "playwright") + + def test_log_debug(self): + """Test debug level logging.""" + logger = get_logger(self.project_dir, agent_id="test", console_output=False) + logger.debug("Debug message") + + query = get_log_query(self.project_dir) + logs = query.query(level="debug") + self.assertEqual(len(logs), 1) + + def test_extra_fields(self): + """Test that extra fields are stored as JSON.""" + logger = get_logger(self.project_dir, agent_id="test", console_output=False) + logger.info("Test", custom_field="value", count=42) + + query = get_log_query(self.project_dir) + logs = query.query() + self.assertEqual(len(logs), 1) + extra = json.loads(logs[0]["extra"]) if logs[0]["extra"] else {} + self.assertEqual(extra.get("custom_field"), "value") + self.assertEqual(extra.get("count"), 42) + + +class TestLogQuery(TestCase): + """Tests for LogQuery.""" + + def setUp(self): + """Create temporary project directory with sample logs.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + # Create sample logs + logger = get_logger(self.project_dir, agent_id="coding-1", console_output=False) + logger.info("Feature started", feature_id=1) + logger.debug("Tool used", feature_id=1, tool_name="bash") + logger.error("Test failed", feature_id=1, tool_name="playwright") + + logger2 = get_logger(self.project_dir, agent_id="coding-2", console_output=False) + logger2.info("Feature started", feature_id=2) + logger2.info("Feature completed", feature_id=2) + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_query_by_level(self): + """Test filtering by log level.""" + query = get_log_query(self.project_dir) + errors = query.query(level="error") + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0]["level"], "error") + + def test_query_by_agent_id(self): + """Test filtering by agent ID.""" + query = get_log_query(self.project_dir) + logs = query.query(agent_id="coding-2") + self.assertEqual(len(logs), 2) + for log in logs: + self.assertEqual(log["agent_id"], "coding-2") + + def test_query_by_feature_id(self): + """Test filtering by feature ID.""" + query = get_log_query(self.project_dir) + logs = query.query(feature_id=1) + self.assertEqual(len(logs), 3) + for log in logs: + self.assertEqual(log["feature_id"], 1) + + def test_query_by_tool_name(self): + """Test filtering by tool name.""" + query = get_log_query(self.project_dir) + logs = query.query(tool_name="playwright") + self.assertEqual(len(logs), 1) + self.assertEqual(logs[0]["tool_name"], "playwright") + + def test_query_full_text_search(self): + """Test full-text search in messages.""" + query = get_log_query(self.project_dir) + logs = query.query(search="Feature started") + self.assertEqual(len(logs), 2) + + def test_query_with_limit(self): + """Test query with limit.""" + query = get_log_query(self.project_dir) + logs = query.query(limit=2) + self.assertEqual(len(logs), 2) + + def test_query_with_offset(self): + """Test query with offset for pagination.""" + query = get_log_query(self.project_dir) + all_logs = query.query() + offset_logs = query.query(offset=2, limit=10) + self.assertEqual(len(offset_logs), len(all_logs) - 2) + + def test_count(self): + """Test count method.""" + query = get_log_query(self.project_dir) + total = query.count() + self.assertEqual(total, 5) + + error_count = query.count(level="error") + self.assertEqual(error_count, 1) + + def test_get_agent_stats(self): + """Test agent statistics.""" + query = get_log_query(self.project_dir) + stats = query.get_agent_stats() + self.assertEqual(len(stats), 2) # coding-1 and coding-2 + + # Find coding-1 stats + coding1_stats = next((s for s in stats if s["agent_id"] == "coding-1"), None) + self.assertIsNotNone(coding1_stats) + self.assertEqual(coding1_stats["error_count"], 1) + + +class TestLogExport(TestCase): + """Tests for log export functionality.""" + + def setUp(self): + """Create temporary project directory with sample logs.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + self.export_dir = Path(self.temp_dir) / "exports" + self.export_dir.mkdir() + + logger = get_logger(self.project_dir, agent_id="test", console_output=False) + logger.info("Test log 1") + logger.info("Test log 2") + logger.error("Test error") + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_export_json(self): + """Test JSON export.""" + query = get_log_query(self.project_dir) + output_path = self.export_dir / "logs.json" + count = query.export_logs(output_path, output_format="json") + + self.assertEqual(count, 3) + self.assertTrue(output_path.exists()) + + with open(output_path) as f: + data = json.load(f) + self.assertEqual(len(data), 3) + + def test_export_jsonl(self): + """Test JSONL export.""" + query = get_log_query(self.project_dir) + output_path = self.export_dir / "logs.jsonl" + count = query.export_logs(output_path, output_format="jsonl") + + self.assertEqual(count, 3) + self.assertTrue(output_path.exists()) + + with open(output_path) as f: + lines = f.readlines() + self.assertEqual(len(lines), 3) + # Verify each line is valid JSON + for line in lines: + json.loads(line) + + def test_export_csv(self): + """Test CSV export.""" + query = get_log_query(self.project_dir) + output_path = self.export_dir / "logs.csv" + count = query.export_logs(output_path, output_format="csv") + + self.assertEqual(count, 3) + self.assertTrue(output_path.exists()) + + import csv + with open(output_path) as f: + reader = csv.DictReader(f) + rows = list(reader) + self.assertEqual(len(rows), 3) + + +class TestThreadSafety(TestCase): + """Tests for thread safety of the logging system.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_concurrent_writes(self): + """Test that concurrent writes don't cause database corruption.""" + num_threads = 10 + logs_per_thread = 50 + + def write_logs(thread_id): + logger = get_logger(self.project_dir, agent_id=f"thread-{thread_id}", console_output=False) + for i in range(logs_per_thread): + logger.info(f"Log {i} from thread {thread_id}", count=i) + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(write_logs, i) for i in range(num_threads)] + for future in futures: + future.result() # Wait for all to complete + + # Verify all logs were written + query = get_log_query(self.project_dir) + total = query.count() + expected = num_threads * logs_per_thread + self.assertEqual(total, expected) + + def test_concurrent_read_write(self): + """Test that reads and writes can happen concurrently.""" + logger = get_logger(self.project_dir, agent_id="writer", console_output=False) + query = get_log_query(self.project_dir) + + # Pre-populate some logs + for i in range(10): + logger.info(f"Initial log {i}") + + read_results = [] + write_done = threading.Event() + + def writer(): + for i in range(50): + logger.info(f"Concurrent log {i}") + write_done.set() + + def reader(): + while not write_done.is_set(): + count = query.count() + read_results.append(count) + time.sleep(0.01) # Avoid busy-spin to reduce CPU usage in CI + + writer_thread = threading.Thread(target=writer) + reader_thread = threading.Thread(target=reader) + + writer_thread.start() + reader_thread.start() + + writer_thread.join() + reader_thread.join() + + # Verify no errors occurred and reads returned valid counts + self.assertTrue(len(read_results) > 0) + self.assertTrue(all(r >= 10 for r in read_results)) # At least initial logs + + # Final count should be 60 (10 initial + 50 concurrent) + final_count = query.count() + self.assertEqual(final_count, 60) + + +class TestCleanup(TestCase): + """Tests for automatic log cleanup.""" + + def setUp(self): + """Create temporary project directory.""" + self.temp_dir = tempfile.mkdtemp() + self.project_dir = Path(self.temp_dir) + + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_cleanup_old_entries(self): + """Test that old entries are cleaned up when max_entries is exceeded.""" + # Create handler with low max_entries + db_path = self.project_dir / ".autocoder" / "logs.db" + db_path.parent.mkdir(parents=True, exist_ok=True) + handler = StructuredLogHandler(db_path, max_entries=10) + # Set low cleanup interval so cleanup triggers during test + handler._cleanup_interval = 5 + + # Create a logger using this handler + import logging + logger = logging.getLogger("test_cleanup") + logger.handlers.clear() + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + # Write more than max_entries (and more than cleanup_interval to trigger cleanup) + for i in range(20): + logger.info(f"Log message {i}") + + # Query the database + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM logs") + count = cursor.fetchone()[0] + conn.close() + + # Should have exactly max_entries after cleanup + self.assertEqual(count, 10) + + +if __name__ == "__main__": + import unittest + unittest.main()