Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/packages/core/agent_framework/_workflows/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ async def _run_agent(self, ctx: WorkflowContext[Never, AgentResponse]) -> AgentR
"""
run_kwargs, options = self._prepare_agent_run_args(ctx.get_state(WORKFLOW_RUN_KWARGS_KEY, {}))

if not self._cache:
logger.warning(
"AgentExecutor %s: Running agent with empty message cache. "
"This could lead to service error for some LLM providers.",
self.id,
)

response = await self._agent.run(
self._cache,
stream=False,
Expand Down Expand Up @@ -362,6 +369,13 @@ async def _run_agent_streaming(self, ctx: WorkflowContext[Never, AgentResponseUp
"""
run_kwargs, options = self._prepare_agent_run_args(ctx.get_state(WORKFLOW_RUN_KWARGS_KEY, {}))

if not self._cache:
logger.warning(
"AgentExecutor %s: Running agent with empty message cache. "
"This could lead to service error for some LLM providers.",
self.id,
)

updates: list[AgentResponseUpdate] = []
streamed_user_input_requests: list[Content] = []
stream = self._agent.run(
Expand Down
12 changes: 6 additions & 6 deletions python/packages/core/agent_framework/_workflows/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,19 @@ async def _deliver_message_inner(edge_runner: EdgeRunner, message: WorkflowMessa
"""Inner loop to deliver a single message through an edge runner."""
return await edge_runner.send_message(message, self._state, self._ctx)

async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None:
# Preserve message order per edge runner (and therefore per routed target path)
# while still allowing parallelism across different edge runners.
for message in source_messages:
await _deliver_message_inner(edge_runner, message)

# Route all messages through normal workflow edges
associated_edge_runners = self._edge_runner_map.get(source_executor_id, [])
if not associated_edge_runners:
# This is expected for terminal nodes (e.g., EndWorkflow, last action in workflow)
logger.debug(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
return

async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None:
# Preserve message order per edge runner (and therefore per routed target path)
# while still allowing parallelism across different edge runners.
for message in source_messages:
await _deliver_message_inner(edge_runner, message)

tasks = [_deliver_messages_for_edge_runner(edge_runner) for edge_runner in associated_edge_runners]
await asyncio.gather(*tasks)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,15 @@
from dataclasses import dataclass
from typing import Any

from agent_framework import Agent, SupportsAgentRun
from agent_framework import Agent, AgentResponse, Message, SupportsAgentRun
from agent_framework._middleware import FunctionInvocationContext, FunctionMiddleware
from agent_framework._sessions import AgentSession
from agent_framework._tools import FunctionTool, tool
from agent_framework._types import AgentResponse, Content, Message
from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest
from agent_framework._workflows._agent_utils import resolve_agent_id
from agent_framework._workflows._checkpoint import CheckpointStorage
from agent_framework._workflows._events import WorkflowEvent
from agent_framework._workflows._request_info_mixin import response_handler
from agent_framework._workflows._typing_utils import is_chat_agent
from agent_framework._workflows._workflow import Workflow
from agent_framework._workflows._workflow_builder import WorkflowBuilder
from agent_framework._workflows._workflow_context import WorkflowContext
Expand Down Expand Up @@ -265,89 +263,7 @@ def _prepare_agent_with_handoffs(

return cloned_agent

def _persist_pending_approval_function_calls(self) -> None:
"""Persist pending approval function calls for stateless provider resumes.

Handoff workflows force ``store=False`` and replay conversation state from ``_full_conversation``.
When a run pauses on function approval, ``AgentExecutor`` returns ``None`` and the assistant
function-call message is not returned as an ``AgentResponse``. Without persisting that call, the
next turn may submit only a function result, which responses-style APIs reject.
"""
pending_calls: list[Content] = []
for request in self._pending_agent_requests.values():
if request.type != "function_approval_request":
continue
function_call = getattr(request, "function_call", None)
if isinstance(function_call, Content) and function_call.type == "function_call":
pending_calls.append(function_call)

if not pending_calls:
return

self._full_conversation.append(
Message(
role="assistant",
contents=pending_calls,
author_name=self._agent.name,
)
)

def _persist_missing_approved_function_results(
self,
*,
runtime_tool_messages: list[Message],
response_messages: list[Message],
) -> None:
"""Persist fallback function_result entries for approved calls when missing.

In approval resumes, function invocation can execute approved tools without
always surfacing those tool outputs in the returned ``AgentResponse.messages``.
For stateless handoff replays, we must keep call/output pairs balanced.
"""
candidate_results: dict[str, Content] = {}
for message in runtime_tool_messages:
for content in message.contents:
if content.type == "function_result":
call_id = getattr(content, "call_id", None)
if isinstance(call_id, str) and call_id:
candidate_results[call_id] = content
continue

if content.type != "function_approval_response" or not content.approved:
continue

function_call = getattr(content, "function_call", None)
call_id = getattr(function_call, "call_id", None) or getattr(content, "id", None)
if isinstance(call_id, str) and call_id and call_id not in candidate_results:
# Fallback content for approved calls when runtime messages do not include
# a concrete function_result payload.
candidate_results[call_id] = Content.from_function_result(
call_id=call_id,
result='{"status":"approved"}',
)

if not candidate_results:
return

observed_result_call_ids: set[str] = set()
for message in [*self._full_conversation, *response_messages]:
for content in message.contents:
if content.type == "function_result" and isinstance(content.call_id, str) and content.call_id:
observed_result_call_ids.add(content.call_id)

missing_call_ids = sorted(set(candidate_results.keys()) - observed_result_call_ids)
if not missing_call_ids:
return

self._full_conversation.append(
Message(
role="tool",
contents=[candidate_results[call_id] for call_id in missing_call_ids],
author_name=self._agent.name,
)
)

def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]:
def _clone_chat_agent(self, agent: Agent) -> Agent:
"""Produce a deep copy of the Agent while preserving runtime configuration."""
options = agent.default_options

Expand All @@ -362,7 +278,6 @@ def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]:
cloned_options = deepcopy(options)
# Disable parallel tool calls to prevent the agent from invoking multiple handoff tools at once.
cloned_options["allow_multiple_tool_calls"] = False
cloned_options["store"] = False
cloned_options["tools"] = new_tools

# restore the original tools, in case they are shared between agents
Expand Down Expand Up @@ -427,45 +342,15 @@ def _handoff_tool() -> None:
@override
async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None:
"""Override to support handoff."""
incoming_messages = list(self._cache)
cleaned_incoming_messages = clean_conversation_for_handoff(incoming_messages)
runtime_tool_messages = [
message
for message in incoming_messages
if any(
content.type
in {
"function_result",
"function_approval_response",
}
for content in message.contents
)
or message.role == "tool"
]

# When the full conversation is empty, it means this is the first run.
# Broadcast the initial cache to all other agents. Subsequent runs won't
# need this since responses are broadcast after each agent run and user input.
if self._is_start_agent and not self._full_conversation:
await self._broadcast_messages(cleaned_incoming_messages, ctx)

# Persist only cleaned chat history between turns to avoid replaying stale tool calls.
self._full_conversation.extend(cleaned_incoming_messages)

# Always run with full conversation context for request_info resumes.
# Keep runtime tool-control messages for this run only (e.g., approval responses).
self._cache = list(self._full_conversation)
self._cache.extend(runtime_tool_messages)

# Handoff workflows are orchestrator-stateful and provider-stateless by design.
# If an existing session still has a service conversation id, clear it to avoid
# replaying stale unresolved tool calls across resumed turns.
if (
is_chat_agent(self._agent)
and self._agent.default_options.get("store") is False
and self._session.service_session_id is not None
):
self._session.service_session_id = None
await self._broadcast_messages(self._cache.copy(), ctx)

# Full conversation maintains the chat history between agents across handoffs,
# excluding internal agent messages such as tool calls and results.
self._full_conversation.extend(self._cache.copy())

# Check termination condition before running the agent
if await self._check_terminate_and_yield(ctx):
Expand All @@ -484,36 +369,35 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None:

# A function approval request is issued by the base AgentExecutor
if response is None:
if is_chat_agent(self._agent) and self._agent.default_options.get("store") is False:
self._persist_pending_approval_function_calls()
# Agent did not complete (e.g., waiting for user input); do not emit response
logger.debug("AgentExecutor %s: Agent did not complete, awaiting user input", self.id)
return

# Remove function call related content from the agent response for broadcast.
# This prevents replaying stale tool artifacts to other agents.
# Remove function call related content from the agent response for full conversation history
cleaned_response = clean_conversation_for_handoff(response.messages)

# For internal tracking, preserve the full response (including function_calls)
# in _full_conversation so that Azure OpenAI can match function_calls with
# function_results when the workflow resumes after user approvals.
self._full_conversation.extend(response.messages)
self._persist_missing_approved_function_results(
runtime_tool_messages=runtime_tool_messages,
response_messages=response.messages,
)
# Append the agent response to the full conversation history. This list removes
# function call related content such that the result stays consistent regardless
# of which agent yields the final output.
self._full_conversation.extend(cleaned_response)

# Broadcast only the cleaned response to other agents (without function_calls/results)
await self._broadcast_messages(cleaned_response, ctx)

# Check if a handoff was requested
if handoff_target := self._is_handoff_requested(response):
if is_handoff_requested := self._is_handoff_requested(response):
handoff_target, handoff_message = is_handoff_requested
if handoff_target not in self._handoff_targets:
raise ValueError(
f"Agent '{resolve_agent_id(self._agent)}' attempted to handoff to unknown "
f"target '{handoff_target}'. Valid targets are: {', '.join(self._handoff_targets)}"
)

# Add the handoff message to the cache so that the next invocation of the agent includes
# the tool call result. This is necessary because each tool call must have a corresponding
# tool result.
self._cache.append(handoff_message)

await ctx.send_message(
AgentExecutorRequest(messages=[], should_respond=True),
target_id=handoff_target,
Expand Down Expand Up @@ -590,12 +474,25 @@ async def _broadcast_messages(
# Since all agents are connected via fan-out, we can directly send the message
await ctx.send_message(agent_executor_request)

def _is_handoff_requested(self, response: AgentResponse) -> str | None:
def _is_handoff_requested(self, response: AgentResponse) -> tuple[str, Message] | None:
"""Determine if the agent response includes a handoff request.

If a handoff tool is invoked, the middleware will short-circuit execution
and provide a synthetic result that includes the target agent ID. The message
that contains the function result will be the last message in the response.

Args:
response: The AgentResponse to inspect for handoff requests

Returns:
A tuple of (target_agent_id, message) if a handoff is requested, or None if no handoff is requested

Note:
The returned message is the full message that contains the handoff function result content. This is
needed to complete the agent's chat history due to the `_AutoHandoffMiddleware` short-circuiting
behavior, which prevents the handoff tool call and result from being included in the agent response
messages. By returning the full message, we can ensure the agent's chat history remains valid with
a function result for the handoff tool call.
"""
if not response.messages:
return None
Expand All @@ -618,7 +515,7 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None:
if parsed_payload:
handoff_target = parsed_payload.get(HANDOFF_FUNCTION_RESULT_KEY)
if isinstance(handoff_target, str):
return handoff_target
return handoff_target, last_message
else:
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ def clean_conversation_for_handoff(conversation: list[Message]) -> list[Message]
- Drops all non-text content from every message.
- Drops messages with no remaining text content.
- Preserves original roles and author names for retained text messages.

Args:
conversation: Full conversation history, including tool-control content
Returns:
Cleaned conversation history with only text content, suitable for handoff routing
"""
cleaned: list[Message] = []
for msg in conversation:
# Keep only plain text history for handoff routing. Tool-control content
# (function_call/function_result/approval payloads) is runtime-only and
# must not be replayed in future model turns.
text_parts = [content.text for content in msg.contents if content.type == "text" and content.text]
# TODO(@taochen): This is a simplified check that considers any non-text content as a tool call.
# We need to enhance this logic to specifically identify tool related contents.
if not text_parts:
continue

Expand Down
Loading
Loading