Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions my-agent/src/agent/loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Core agent loop: thin wrapper around the LangGraph agent graph."""

from __future__ import annotations

import hashlib
import logging

from ..config.settings import Settings
from ..store.factory import create_checkpointer
from .graph import build_graph

Comment on lines +1 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix import sorting per pipeline failure.

Ruff reports the import block is unsorted. Organize imports according to isort conventions.

Proposed fix
 """Core agent loop: thin wrapper around the LangGraph agent graph."""

 from __future__ import annotations

 import hashlib
 import logging

+from .graph import build_graph
 from ..config.settings import Settings
 from ..store.factory import create_checkpointer
-from .graph import build_graph
🧰 Tools
🪛 GitHub Actions: CI

[error] 1-8: ruff: Import block is unsorted or unformatted. Organize imports.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@my-agent/src/agent/loop.py` around lines 1 - 11, The import block in loop.py
is unsorted; reorder imports to follow isort conventions: keep "from __future__
import annotations" first, then standard library imports (hashlib, logging),
then third-party (none here), then local package imports (from ..config.settings
import Settings, from ..store.factory import create_checkpointer, from .graph
import build_graph); alternatively run isort/ruff --fix to automatically apply
this ordering and ensure the import groups and alphabetical order are correct.

logger = logging.getLogger(__name__)

# Per-1M-token pricing: (input, output)
_PRICING: dict[str, tuple[float, float]] = {
"anthropic": (3.0, 15.0), # Claude Sonnet
"gemini": (1.25, 10.0), # Gemini 2.5 Pro
"codex": (2.50, 10.0), # Codex
}


def _estimate_cost(provider: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate API cost in USD from token counts and provider."""
rate_in, rate_out = _PRICING.get(provider, (0.0, 0.0))
return (input_tokens * rate_in + output_tokens * rate_out) / 1_000_000


class AgentLoop:
"""Orchestrates the plan-implement-test-fix cycle via LangGraph."""

def __init__(self, settings: Settings, repo_name: str, task: str, branch: str | None = None):
"""Initialize with settings, target repo name, task description, and optional branch."""
self.settings = settings
self.task = task
self.branch = branch
self.repo_config = self._find_repo(repo_name)

def _find_repo(self, name: str):
"""Look up a RepoConfig by name, raising ValueError if not found."""
for r in self.settings.repositories:
if r.name == name:
return r
available = [r.name for r in self.settings.repositories]
raise ValueError(f"Repository '{name}' not found. Available: {available}")

def _get_model_name(self) -> str:
"""Return the model name for the active LLM provider."""
return self.settings.llm.get_model()

async def run(self) -> str:
"""Execute the full agent pipeline. Returns the PR URL or a summary."""
logger.info("Opening LangGraph state store (backend=%s)", self.settings.store.backend)
async with create_checkpointer(self.settings.store) as checkpointer:
logger.info("State store ready")
graph = build_graph(checkpointer=checkpointer)

# Deterministic thread_id for checkpoint resumability.
# When a branch is given we use it directly; otherwise we derive
# a stable id from repo+task so re-runs of the same task resume.
if self.branch:
thread_id = f"{self.repo_config.name}/{self.branch}"
else:
key = f"{self.repo_config.name}/{self.task}"
thread_id = hashlib.sha256(key.encode()).hexdigest()[:16]

initial_state = {
"task": self.task,
"settings": self.settings,
"repo_config": self.repo_config,
"thread_id": thread_id,
}
if self.branch:
initial_state["branch"] = self.branch

config: dict = {"configurable": {"thread_id": thread_id}}
if self.settings.tracing.enabled:
config["tags"] = [
f"repo:{self.repo_config.name}",
f"provider:{self.settings.llm.provider}",
f"language:{self.repo_config.language}",
]
config["metadata"] = {
"repo": self.repo_config.name,
"task": self.task,
"provider": self.settings.llm.provider,
"model": self._get_model_name(),
"language": self.repo_config.language,
"branch": self.branch or "",
"repo_url": self.repo_config.url,
}

final_state = await graph.ainvoke(initial_state, config=config)

token_usage = final_state.get("token_usage", {})
input_tokens = token_usage.get("input_tokens", 0)
output_tokens = token_usage.get("output_tokens", 0)
cost = _estimate_cost(self.settings.llm.provider, input_tokens, output_tokens)
logger.info(
"Token usage: %d input, %d output — estimated cost: $%.4f",
input_tokens, output_tokens, cost,
)

return final_state.get("result", "No result")
60 changes: 60 additions & 0 deletions my-agent/src/agent/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Graph state definition for the LangGraph agent."""

from __future__ import annotations

from pathlib import Path
from typing import Annotated, Any, TypedDict

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage
from langgraph.channels import UntrackedValue
from langgraph.graph.message import add_messages

from ..config.settings import RepoConfig, Settings
from ..git_ops.repo import GitRepo


class AgentState(TypedDict, total=False):
"""Typed state dictionary for the LangGraph agent pipeline."""

# Immutable inputs (set once at graph entry) — not checkpointed
task: Annotated[str, UntrackedValue]
settings: Annotated[Settings, UntrackedValue]
repo_config: Annotated[RepoConfig, UntrackedValue]
branch: Annotated[str | None, UntrackedValue]
thread_id: Annotated[str, UntrackedValue]

# Initialised in clone_and_branch, reused across nodes — not checkpointed
readme_preamble: Annotated[str, UntrackedValue]
messages: Annotated[list[BaseMessage], add_messages]
llm: Annotated[BaseChatModel, UntrackedValue]
git: Annotated[GitRepo, UntrackedValue]
repo_dir: Annotated[Path, UntrackedValue]
active_branch: Annotated[str, UntrackedValue]

# Control flow
prompt_type: str

# Plan phase
plan: dict[str, Any]
file_contents: str
plan_complete: bool

# Post-implement
changed_files: list[str]

# Test/fix loop
attempt: int
max_attempts: int
checks_passed: bool
test_output: str
fix_history: list[dict[str, str]]

# PR
pr_ready: bool

# Token tracking
token_usage: dict[str, int]

# Final output
result: str
Loading
Loading