Skip to content
Merged
3 changes: 2 additions & 1 deletion python/packages/claude/agent_framework_claude/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import importlib.metadata

from ._agent import ClaudeAgent, ClaudeAgentOptions, ClaudeAgentSettings
from ._agent import ClaudeAgent, ClaudeAgentOptions, ClaudeAgentSettings, RawClaudeAgent

try:
__version__ = importlib.metadata.version(__name__)
Expand All @@ -13,5 +13,6 @@
"ClaudeAgent",
"ClaudeAgentOptions",
"ClaudeAgentSettings",
"RawClaudeAgent",
"__version__",
]
129 changes: 68 additions & 61 deletions python/packages/claude/agent_framework_claude/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
normalize_tools,
)
from agent_framework.exceptions import AgentException
from agent_framework.observability import AgentTelemetryLayer
from claude_agent_sdk import (
AssistantMessage,
ClaudeSDKClient,
Expand Down Expand Up @@ -171,8 +172,11 @@ class ClaudeAgentOptions(TypedDict, total=False):
)


class ClaudeAgent(BaseAgent, Generic[OptionsT]):
"""Claude Agent using Claude Code CLI.
class RawClaudeAgent(BaseAgent, Generic[OptionsT]):
"""Claude Agent using Claude Code CLI without telemetry layers.

This is the core Claude agent implementation without OpenTelemetry instrumentation.
For most use cases, prefer :class:`ClaudeAgent` which includes telemetry support.

Wraps the Claude Agent SDK to provide agentic capabilities including
tool use, session management, and streaming responses.
Expand All @@ -188,45 +192,13 @@ class ClaudeAgent(BaseAgent, Generic[OptionsT]):

.. code-block:: python

from agent_framework_claude import ClaudeAgent
from agent_framework.anthropic import RawClaudeAgent

async with ClaudeAgent(
async with RawClaudeAgent(
instructions="You are a helpful assistant.",
) as agent:
response = await agent.run("Hello!")
print(response.text)

With streaming:

.. code-block:: python

async with ClaudeAgent() as agent:
async for update in agent.run("Write a poem"):
print(update.text, end="", flush=True)

With session management:

.. code-block:: python

async with ClaudeAgent() as agent:
session = agent.create_session()
await agent.run("Remember my name is Alice", session=session)
response = await agent.run("What's my name?", session=session)
# Claude will remember "Alice" from the same session

With Agent Framework tools:

.. code-block:: python

from agent_framework import tool

@tool
def greet(name: str) -> str:
\"\"\"Greet someone by name.\"\"\"
return f"Hello, {name}!"

async with ClaudeAgent(tools=[greet]) as agent:
response = await agent.run("Greet Alice")
"""

AGENT_PROVIDER_NAME: ClassVar[str] = "anthropic.claude"
Expand All @@ -246,7 +218,7 @@ def __init__(
env_file_path: str | None = None,
env_file_encoding: str | None = None,
) -> None:
"""Initialize a ClaudeAgent instance.
"""Initialize a RawClaudeAgent instance.

Args:
instructions: System prompt for the agent.
Expand Down Expand Up @@ -343,7 +315,7 @@ def _normalize_tools(
normalized = normalize_tools(tool)
self._custom_tools.extend(normalized)

async def __aenter__(self) -> ClaudeAgent[OptionsT]:
async def __aenter__(self) -> RawClaudeAgent[OptionsT]:
"""Start the agent when entering async context."""
await self.start()
return self
Expand Down Expand Up @@ -568,37 +540,60 @@ def _format_prompt(self, messages: list[Message] | None) -> str:
return ""
return "\n".join([msg.text or "" for msg in messages])

@property
def default_options(self) -> dict[str, Any]:
"""Expose options with ``instructions`` key.

Maps ``system_prompt`` to ``instructions`` for compatibility with
:class:`AgentTelemetryLayer`, which reads the system prompt from
the ``instructions`` key.
"""
opts = dict(self._default_options)
system_prompt = opts.pop("system_prompt", None)
if system_prompt is not None:
opts["instructions"] = system_prompt
return opts

def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
"""Build AgentResponse and propagate structured_output as value.

Args:
updates: The collected stream updates.

Returns:
An AgentResponse with structured_output set as value if present.
"""
structured_output = getattr(self, "_structured_output", None)
return AgentResponse.from_updates(updates, value=structured_output)

@overload
def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: Literal[True],
stream: Literal[False] = ...,
session: AgentSession | None = None,
options: OptionsT | MutableMapping[str, Any] | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]: ...
) -> Awaitable[AgentResponse[Any]]: ...

@overload
async def run(
def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: Literal[False] = ...,
stream: Literal[True],
session: AgentSession | None = None,
options: OptionsT | MutableMapping[str, Any] | None = None,
**kwargs: Any,
) -> AgentResponse[Any]: ...
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...

def run(
self,
messages: AgentRunInputs | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
options: OptionsT | MutableMapping[str, Any] | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate] | Awaitable[AgentResponse[Any]]:
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Run the agent with the given messages.

Args:
Expand All @@ -609,33 +604,23 @@ def run(
returns an awaitable AgentResponse.
session: The conversation session. If session has service_session_id set,
the agent will resume that session.
options: Runtime options (model, permission_mode can be changed per-request).
kwargs: Additional keyword arguments.
kwargs: Additional keyword arguments including 'options' for runtime options
(model, permission_mode can be changed per-request).

Returns:
When stream=True: An ResponseStream for streaming updates.
When stream=False: An Awaitable[AgentResponse] with the complete response.
"""
options = kwargs.pop("options", None)
response = ResponseStream(
self._get_stream(messages, session=session, options=options, **kwargs),
finalizer=self._finalize_response,
)

if stream:
return response
return response.get_final_response()

def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
"""Build AgentResponse and propagate structured_output as value.

Args:
updates: The collected stream updates.

Returns:
An AgentResponse with structured_output set as value if present.
"""
structured_output = getattr(self, "_structured_output", None)
return AgentResponse.from_updates(updates, value=structured_output)

async def _get_stream(
self,
messages: AgentRunInputs | None = None,
Expand Down Expand Up @@ -721,3 +706,25 @@ async def _get_stream(

# Store structured output for the finalizer
self._structured_output = structured_output


class ClaudeAgent(AgentTelemetryLayer, RawClaudeAgent[OptionsT], Generic[OptionsT]):
"""Claude Agent with OpenTelemetry instrumentation.

This is the recommended agent class for most use cases. It includes
OpenTelemetry-based telemetry for observability. For a minimal
implementation without telemetry, use :class:`RawClaudeAgent`.

Examples:
Basic usage with context manager:

.. code-block:: python

from agent_framework.anthropic import ClaudeAgent

async with ClaudeAgent(
instructions="You are a helpful assistant.",
) as agent:
response = await agent.run("Hello!")
print(response.text)
"""
Loading