diff --git a/Makefile b/Makefile index e7ac91e..b14877c 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,13 @@ -.PHONY: chat dev console +.PHONY: dev console install start install: uv sync && uv run pre-commit install && cp .env.example .env && echo "Please edit the .env file with your API keys." -chat: - uv run chat - console: uv run textual console -x SYSTEM -x EVENT -x DEBUG -x INFO dev: uv run textual run --dev -c chat + +start: + uv run chat diff --git a/README.md b/README.md index 308394c..d73532e 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ make install Update the `.env` with your `ANTHROPIC_API_KEY` and then run: ```bash -make chat +make start # Alternatively, if in dev (see below) make dev diff --git a/agent-chat-cli.config.yaml b/agent-chat-cli.config.yaml index 2f114b7..4f06d50 100644 --- a/agent-chat-cli.config.yaml +++ b/agent-chat-cli.config.yaml @@ -10,7 +10,7 @@ model: haiku include_partial_messages: true # Enable dynamic MCP server inference -mcp_server_inference: true +mcp_server_inference: false # Named agents with custom configurations # agents: @@ -57,4 +57,4 @@ mcp_servers: disallowed_tools: [] # Permission mode for tool execution -permission_mode: "bypassPermissions" +permission_mode: "default" diff --git a/src/agent_chat_cli/app.py b/src/agent_chat_cli/app.py index d6703bd..132e6a0 100644 --- a/src/agent_chat_cli/app.py +++ b/src/agent_chat_cli/app.py @@ -7,6 +7,7 @@ from agent_chat_cli.components.header import Header from agent_chat_cli.components.chat_history import ChatHistory, MessagePosted from agent_chat_cli.components.thinking_indicator import ThinkingIndicator +from agent_chat_cli.components.tool_permission_prompt import ToolPermissionPrompt from agent_chat_cli.components.user_input import UserInput from agent_chat_cli.system.agent_loop import AgentLoop from agent_chat_cli.system.message_bus import MessageBus @@ -38,12 +39,14 @@ def __init__(self) -> None: ) self.actions = Actions(self) + self.pending_tool_permission: dict | None = None def compose(self) -> ComposeResult: with VerticalScroll(): yield Header() yield ChatHistory() yield ThinkingIndicator() + yield ToolPermissionPrompt(actions=self.actions) yield UserInput(actions=self.actions) async def on_mount(self) -> None: diff --git a/src/agent_chat_cli/components/tool_permission_prompt.py b/src/agent_chat_cli/components/tool_permission_prompt.py new file mode 100644 index 0000000..0748377 --- /dev/null +++ b/src/agent_chat_cli/components/tool_permission_prompt.py @@ -0,0 +1,92 @@ +from typing import Any, TYPE_CHECKING + +from textual.widget import Widget +from textual.app import ComposeResult +from textual.widgets import Label, Input +from textual.reactive import reactive + +from agent_chat_cli.components.caret import Caret +from agent_chat_cli.components.flex import Flex +from agent_chat_cli.components.spacer import Spacer +from agent_chat_cli.utils import get_tool_info +from agent_chat_cli.utils.logger import log_json + +if TYPE_CHECKING: + from agent_chat_cli.system.actions import Actions + + +class ToolPermissionPrompt(Widget): + is_visible = reactive(False) + tool_name = reactive("") + tool_input: dict[str, Any] = reactive({}, init=False) # type: ignore[assignment] + + def __init__(self, actions: "Actions") -> None: + super().__init__() + self.actions = actions + + def compose(self) -> ComposeResult: + yield Label("", id="tool-display") + yield Label(" [dim]Allow? (Enter=yes, ESC=no, or ask another question):[/dim]") + + yield Spacer() + + with Flex(): + yield Caret() + yield Input(placeholder="Yes", id="permission-input") + + def watch_is_visible(self, is_visible: bool) -> None: + self.display = is_visible + + if is_visible: + input_widget = self.query_one("#permission-input", Input) + input_widget.value = "" + input_widget.focus() + + def watch_tool_name(self, tool_name: str) -> None: + if not tool_name: + return + + tool_info = get_tool_info(tool_name) + tool_display_label = self.query_one("#tool-display", Label) + + if tool_info["server_name"]: + tool_display = ( + rf"[bold]Confirm Tool:[/] [cyan]\[{tool_info['server_name']}][/] " + f"{tool_info['tool_name']}" + ) + else: + tool_display = f"[bold]Confirm Tool:[/] {tool_name}" + + tool_display_label.update(tool_display) + + async def on_input_submitted(self, event: Input.Submitted) -> None: + raw_value = event.value + response = event.value.strip() or "yes" + + log_json( + { + "event": "permission_input_submitted", + "raw_value": raw_value, + "stripped_value": event.value.strip(), + "final_response": response, + } + ) + + await self.actions.respond_to_tool_permission(response) + + async def on_input_blurred(self, event: Input.Blurred) -> None: + if self.is_visible: + input_widget = self.query_one("#permission-input", Input) + input_widget.focus() + + async def on_key(self, event) -> None: + if event.key == "escape": + log_json({"event": "permission_escape_pressed"}) + + event.stop() + event.prevent_default() + + input_widget = self.query_one("#permission-input", Input) + input_widget.value = "no" + + await input_widget.action_submit() diff --git a/src/agent_chat_cli/docs/architecture.md b/src/agent_chat_cli/docs/architecture.md index 132a47a..85754c1 100644 --- a/src/agent_chat_cli/docs/architecture.md +++ b/src/agent_chat_cli/docs/architecture.md @@ -15,6 +15,7 @@ Textual widgets responsible for UI rendering: - **Message widgets**: SystemMessage, UserMessage, AgentMessage, ToolMessage - **UserInput**: Handles user text input and submission - **ThinkingIndicator**: Shows when agent is processing +- **ToolPermissionPrompt**: Interactive widget for approving/denying tool execution requests ### System Layer @@ -26,6 +27,9 @@ Manages the conversation loop with Claude SDK: - Emits AgentMessageType events (STREAM_EVENT, ASSISTANT, RESULT) - Manages session persistence via session_id - Supports dynamic MCP server inference and loading +- Implements `_can_use_tool` callback for interactive tool permission requests +- Uses `permission_lock` (asyncio.Lock) to serialize parallel permission requests +- Manages `permission_response_queue` for user responses to tool permission prompts #### MCP Server Inference (`system/mcp_inference.py`) Intelligently determines which MCP servers are needed for each query: @@ -42,19 +46,23 @@ Routes agent messages to appropriate UI components: - Controls thinking indicator state - Manages scroll-to-bottom behavior - Displays system messages (e.g., MCP server connection notifications) +- Detects tool permission requests and shows ToolPermissionPrompt +- Manages UI transitions between UserInput and ToolPermissionPrompt #### Actions (`system/actions.py`) Centralizes all user-initiated actions and controls: - **quit()**: Exits the application - **query(user_input)**: Sends user query to agent loop queue -- **interrupt()**: Stops streaming mid-execution by setting interrupt flag and calling SDK interrupt +- **interrupt()**: Stops streaming mid-execution by setting interrupt flag and calling SDK interrupt (ignores ESC when tool permission prompt is visible) - **new()**: Starts new conversation by sending NEW_CONVERSATION control command +- **respond_to_tool_permission(response)**: Handles tool permission responses, manages UI state transitions between permission prompt and user input - Manages UI state (thinking indicator, chat history clearing) -- Directly accesses agent_loop internals (query_queue, client, interrupting flag) +- Directly accesses agent_loop internals (query_queue, client, interrupting flag, permission_response_queue) Actions are triggered via: - Keybindings in app.py (ESC → action_interrupt, Ctrl+N → action_new) - Text commands in user_input.py ("exit", "clear") +- Component events (ToolPermissionPrompt.on_input_submitted → respond_to_tool_permission) ### Utils Layer @@ -211,6 +219,103 @@ mcp_servers: # ... rest of config ``` +## Tool Permission System + +The application implements interactive tool permission requests that allow users to approve or deny tool execution in real-time. + +### Components + +#### ToolPermissionPrompt (`components/tool_permission_prompt.py`) +Textual widget that displays permission requests to the user: +- Shows tool name with MCP server info +- Provides input field for user response +- Supports Enter (approve), ESC (deny), or custom text responses + +### Permission Flow + +``` +Tool Execution Request (from Claude SDK) + ↓ +AgentLoop._can_use_tool (callback with permission_lock acquired) + ↓ +Emit SYSTEM AgentMessage with tool_permission_request data + ↓ +MessageBus._handle_system detects permission request + ↓ +Show ToolPermissionPrompt, hide UserInput + ↓ +User Response: + - Enter (or "yes") → Approve + - ESC (or "no") → Deny + - Custom text → Send to Claude as alternative instruction + ↓ +Actions.respond_to_tool_permission(response) + ↓ +Put response on permission_response_queue + ↓ +Hide ToolPermissionPrompt, show UserInput + ↓ +AgentLoop._can_use_tool receives response + ↓ +Return PermissionResultAllow or PermissionResultDeny + ↓ +Next tool permission request (if multiple tools called) +``` + +### Serialization with Permission Lock + +When multiple tools request permission in parallel, a `permission_lock` (asyncio.Lock) ensures they are handled sequentially: + +1. First tool acquires lock → Shows prompt → Waits for response → Releases lock +2. Second tool acquires lock → Shows prompt → Waits for response → Releases lock +3. Third tool acquires lock → Shows prompt → Waits for response → Releases lock + +This prevents race conditions where multiple prompts would overwrite each other and ensures each tool gets a dedicated user response. + +### Permission Responses + +The `_can_use_tool` callback returns typed permission results: + +**Approve (CONFIRM)**: +```python +return PermissionResultAllow( + behavior="allow", + updated_input=tool_input, +) +``` + +**Deny (DENY)**: +```python +return PermissionResultDeny( + behavior="deny", + message="User denied permission", + interrupt=True, +) +``` + +**Custom Response**: +```python +return PermissionResultDeny( + behavior="deny", + message=user_response, # Alternative instruction sent to Claude + interrupt=True, +) +``` + +### ESC Key Handling + +When ToolPermissionPrompt is visible, the ESC key is intercepted: +- `Actions.interrupt()` checks `permission_prompt.is_visible` +- If visible, returns early without interrupting the agent +- ToolPermissionPrompt's `on_key` handler processes ESC to deny the tool +- If not visible, ESC performs normal interrupt behavior + +### System Messages + +Permission denials generate system messages in the chat: +- **Denied**: `"Permission denied for {tool_name}"` +- **Custom response**: `"Custom response for {tool_name}: {user_response}"` + ## User Commands ### Text Commands @@ -219,7 +324,7 @@ mcp_servers: ### Keybindings - **Ctrl+C**: Quit application -- **ESC**: Interrupt streaming response +- **ESC**: Interrupt streaming response (or deny tool permission if prompt visible) - **Ctrl+N**: Start new conversation ## Session Management @@ -274,3 +379,5 @@ SDK reconnects to previous session with full history - Control commands are queued alongside user queries to ensure proper task ordering - Agent loop processes both strings (user queries) and ControlCommands from the same queue - Interrupt flag is checked on each streaming message to enable immediate stop +- Tool permission requests are serialized via asyncio.Lock to handle parallel tool calls sequentially +- Permission responses use typed SDK objects (PermissionResultAllow, PermissionResultDeny) rather than plain dictionaries diff --git a/src/agent_chat_cli/system/actions.py b/src/agent_chat_cli/system/actions.py index dea5526..c157339 100644 --- a/src/agent_chat_cli/system/actions.py +++ b/src/agent_chat_cli/system/actions.py @@ -1,7 +1,11 @@ +from textual.widgets import Input + from agent_chat_cli.system.agent_loop import AgentLoop from agent_chat_cli.utils.enums import ControlCommand from agent_chat_cli.components.chat_history import ChatHistory from agent_chat_cli.components.thinking_indicator import ThinkingIndicator +from agent_chat_cli.components.tool_permission_prompt import ToolPermissionPrompt +from agent_chat_cli.utils.logger import log_json class Actions: @@ -16,6 +20,10 @@ async def query(self, user_input: str) -> None: await self.agent_loop.query_queue.put(user_input) async def interrupt(self) -> None: + permission_prompt = self.app.query_one(ToolPermissionPrompt) + if permission_prompt.is_visible: + return + self.agent_loop.interrupting = True await self.agent_loop.client.interrupt() @@ -30,3 +38,26 @@ async def new(self) -> None: thinking_indicator = self.app.query_one(ThinkingIndicator) thinking_indicator.is_thinking = False + + async def respond_to_tool_permission(self, response: str) -> None: + from agent_chat_cli.components.user_input import UserInput + + log_json( + { + "event": "permission_response_action", + "response": response, + } + ) + + await self.agent_loop.permission_response_queue.put(response) + + thinking_indicator = self.app.query_one(ThinkingIndicator) + thinking_indicator.is_thinking = True + + permission_prompt = self.app.query_one(ToolPermissionPrompt) + permission_prompt.is_visible = False + + user_input = self.app.query_one(UserInput) + user_input.display = True + input_widget = user_input.query_one(Input) + input_widget.focus() diff --git a/src/agent_chat_cli/system/agent_loop.py b/src/agent_chat_cli/system/agent_loop.py index 823e313..ccb8db6 100644 --- a/src/agent_chat_cli/system/agent_loop.py +++ b/src/agent_chat_cli/system/agent_loop.py @@ -11,6 +11,10 @@ SystemMessage, TextBlock, ToolUseBlock, + ToolPermissionContext, + PermissionResult, + PermissionResultAllow, + PermissionResultDeny, ) from agent_chat_cli.utils.config import ( @@ -44,25 +48,18 @@ def __init__( self.on_message = on_message self.query_queue: asyncio.Queue[str | ControlCommand] = asyncio.Queue() + self.permission_response_queue: asyncio.Queue[str] = asyncio.Queue() + self.permission_lock = asyncio.Lock() self._running = False self.interrupting = False - async def _initialize_client(self, mcp_servers: dict) -> None: - sdk_config = get_sdk_config(self.config) - sdk_config["mcp_servers"] = mcp_servers - - if self.session_id: - sdk_config["resume"] = self.session_id - - self.client = ClaudeSDKClient(options=ClaudeAgentOptions(**sdk_config)) - - await self.client.connect() - async def start(self) -> None: + # Boot MCP servers lazily if self.config.mcp_server_inference: await self._initialize_client(mcp_servers={}) else: + # Boot MCP servers all at once mcp_servers = { name: config.model_dump() for name, config in self.available_servers.items() @@ -75,12 +72,14 @@ async def start(self) -> None: while self._running: user_input = await self.query_queue.get() + # Check for new convo flags if isinstance(user_input, ControlCommand): if user_input == ControlCommand.NEW_CONVERSATION: self.inferred_servers.clear() await self.client.disconnect() + # Reset MCP servers based on config settings if self.config.mcp_server_inference: await self._initialize_client(mcp_servers={}) else: @@ -92,6 +91,7 @@ async def start(self) -> None: await self._initialize_client(mcp_servers=mcp_servers) continue + # Infer MCP servers based on user messages in chat if self.config.mcp_server_inference: inference_result = await infer_mcp_servers( user_message=user_input, @@ -100,6 +100,7 @@ async def start(self) -> None: session_id=self.session_id, ) + # If there are new results, create an updated mcp_server list if inference_result["new_servers"]: server_list = ", ".join(inference_result["new_servers"]) @@ -112,6 +113,8 @@ async def start(self) -> None: await asyncio.sleep(0.1) + # If there's updates, we reinitialize the agent SDK (with the + # persisted session_id from the turn, stored in the instance) await self.client.disconnect() mcp_servers = { @@ -126,6 +129,7 @@ async def start(self) -> None: # Send query await self.client.query(user_input) + # Wait for messages from Claude async for message in self.client.receive_response(): if self.interrupting: continue @@ -134,6 +138,20 @@ async def start(self) -> None: await self.on_message(AgentMessage(type=AgentMessageType.RESULT, data=None)) + async def _initialize_client(self, mcp_servers: dict) -> None: + sdk_config = get_sdk_config(self.config) + + sdk_config["mcp_servers"] = mcp_servers + sdk_config["can_use_tool"] = self._can_use_tool + + if self.session_id: + sdk_config["resume"] = self.session_id + + # Init the Agent + self.client = ClaudeSDKClient(options=ClaudeAgentOptions(**sdk_config)) + + await self.client.connect() + async def _handle_message(self, message: Any) -> None: if isinstance(message, SystemMessage): log_json(message.data) @@ -141,14 +159,17 @@ async def _handle_message(self, message: Any) -> None: if message.subtype == AgentMessageType.INIT.value and message.data.get( "session_id" ): + # When initializing the chat, we store the session_id for later self.session_id = message.data["session_id"] + # Handle streaming messages if hasattr(message, "event"): event = message.event # type: ignore[attr-defined] if event.get("type") == ContentType.CONTENT_BLOCK_DELTA.value: delta = event.get("delta", {}) + # Chunk in streaming text if delta.get("type") == ContentType.TEXT_DELTA.value: text_chunk = delta.get("text", "") @@ -159,9 +180,11 @@ async def _handle_message(self, message: Any) -> None: data={"text": text_chunk}, ) ) + elif isinstance(message, AssistantMessage): content = [] + # Handle different kinds of content types if hasattr(message, "content"): for block in message.content: # type: ignore[attr-defined] if isinstance(block, TextBlock): @@ -178,9 +201,90 @@ async def _handle_message(self, message: Any) -> None: } ) + # Finally, post the agent assistant response await self.on_message( AgentMessage( type=AgentMessageType.ASSISTANT, data={"content": content}, ) ) + + async def _can_use_tool( + self, + tool_name: str, + tool_input: dict[str, Any], + context: ToolPermissionContext, + ) -> PermissionResult: + """Agent SDK handler for tool use permissions""" + + # Handle permission request queue + async with self.permission_lock: + await self.on_message( + AgentMessage( + type=AgentMessageType.TOOL_PERMISSION_REQUEST, + data={ + "tool_name": tool_name, + "tool_input": tool_input, + }, + ) + ) + + # Grab response from permission queue + user_response = await self.permission_response_queue.get() + response = user_response.lower().strip() + + CONFIRM = response in ["y", "yes", "allow", ""] + DENY = response in ["n", "no", "deny"] + + log_json( + { + "event": "tool_permission_decision", + "response": response, + "CONFIRM": CONFIRM, + "DENY": DENY, + } + ) + + if CONFIRM: + return PermissionResultAllow( + behavior="allow", + updated_input=tool_input, + ) + + if DENY: + await self.on_message( + AgentMessage( + type=AgentMessageType.SYSTEM, + data=f"Permission denied for {tool_name}", + ) + ) + + await self.on_message( + AgentMessage( + type=AgentMessageType.ASSISTANT, + data="What should we do differently?", + ) + ) + + return PermissionResultDeny( + behavior="deny", + message="User denied permission", + interrupt=True, + ) + + # If a user instead typed in a message (instead of confirming or denying) + # forward this on to the agent. + await self.on_message( + AgentMessage( + type=AgentMessageType.SYSTEM, + data=f"Custom response for {tool_name}: {user_response}", + ) + ) + + await self.client.query(user_response) + + return PermissionResultDeny( + behavior="deny", + message=user_response, + interrupt=True, + ) diff --git a/src/agent_chat_cli/system/message_bus.py b/src/agent_chat_cli/system/message_bus.py index d7ba305..2688c3d 100644 --- a/src/agent_chat_cli/system/message_bus.py +++ b/src/agent_chat_cli/system/message_bus.py @@ -6,6 +6,7 @@ from agent_chat_cli.components.chat_history import ChatHistory, MessagePosted from agent_chat_cli.components.thinking_indicator import ThinkingIndicator +from agent_chat_cli.components.tool_permission_prompt import ToolPermissionPrompt from agent_chat_cli.components.user_input import UserInput from agent_chat_cli.components.messages import ( AgentMessage as AgentMessageWidget, @@ -14,6 +15,7 @@ ) from agent_chat_cli.system.agent_loop import AgentMessage from agent_chat_cli.utils.enums import AgentMessageType, ContentType +from agent_chat_cli.utils.logger import log_json if TYPE_CHECKING: from textual.app import App @@ -36,12 +38,15 @@ async def handle_agent_message(self, message: AgentMessage) -> None: case AgentMessageType.SYSTEM: await self._handle_system(message) + case AgentMessageType.TOOL_PERMISSION_REQUEST: + await self._handle_tool_permission_request(message) + case AgentMessageType.RESULT: await self._handle_result() async def _scroll_to_bottom(self) -> None: - """Scroll the container to the bottom after a slight pause.""" await asyncio.sleep(0.1) + container = self.app.query_one(VerticalScroll) container.scroll_end(animate=False, immediate=True) @@ -59,10 +64,12 @@ async def _handle_stream_event(self, message: AgentMessage) -> None: agent_msg = AgentMessageWidget() agent_msg.message = text_chunk + # Append to chat history chat_history.mount(agent_msg) self.current_agent_message = agent_msg else: self.current_response_text += text_chunk + markdown = self.current_agent_message.query_one(Markdown) markdown.update(self.current_response_text) @@ -86,6 +93,8 @@ async def _handle_assistant(self, message: AgentMessage) -> None: tool_msg = ToolMessage() tool_msg.tool_name = tool_name tool_msg.tool_input = tool_input + + # Append to chat history chat_history.mount(tool_msg) await self._scroll_to_bottom() @@ -95,7 +104,30 @@ async def _handle_system(self, message: AgentMessage) -> None: message.data if isinstance(message.data, str) else str(message.data) ) + # Dispatch message self.app.post_message(MessagePosted(Message.system(system_content))) + + await self._scroll_to_bottom() + + async def _handle_tool_permission_request(self, message: AgentMessage) -> None: + log_json( + { + "event": "showing_permission_prompt", + "tool_name": message.data.get("tool_name", ""), + } + ) + + thinking_indicator = self.app.query_one(ThinkingIndicator) + thinking_indicator.is_thinking = False + + permission_prompt = self.app.query_one(ToolPermissionPrompt) + permission_prompt.tool_name = message.data.get("tool_name", "") + permission_prompt.tool_input = message.data.get("tool_input", {}) + permission_prompt.is_visible = True + + user_input = self.app.query_one(UserInput) + user_input.display = False + await self._scroll_to_bottom() async def _handle_result(self) -> None: diff --git a/src/agent_chat_cli/system/styles.tcss b/src/agent_chat_cli/system/styles.tcss index e18cf9e..4583b27 100644 --- a/src/agent_chat_cli/system/styles.tcss +++ b/src/agent_chat_cli/system/styles.tcss @@ -27,6 +27,7 @@ VerticalScroll { padding-top: 1; padding-left: 2; background: transparent; + scrollbar-size: 0 0; } Header { @@ -49,6 +50,11 @@ BalloonSpinner { margin-right: 1; } +ToolPermissionPrompt { + height: auto; + margin-bottom: 1; +} + UserInput { height: auto; padding-bottom: 1; diff --git a/src/agent_chat_cli/utils/enums.py b/src/agent_chat_cli/utils/enums.py index 89e4959..d90a518 100644 --- a/src/agent_chat_cli/utils/enums.py +++ b/src/agent_chat_cli/utils/enums.py @@ -7,6 +7,7 @@ class AgentMessageType(Enum): RESULT = "result" STREAM_EVENT = "stream_event" SYSTEM = "system" + TOOL_PERMISSION_REQUEST = "tool_permission_request" class ContentType(Enum):