Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
)
from ._workflows._edge_runner import create_edge_runner
from ._workflows._events import (
OrchestrationComplete,
WorkflowErrorDetails,
WorkflowEvent,
WorkflowEventSource,
Expand Down Expand Up @@ -261,6 +262,7 @@
"MiddlewareTermination",
"MiddlewareType",
"MiddlewareTypes",
"OrchestrationComplete",
"OuterFinalT",
"OuterUpdateT",
"RawAgent",
Expand Down
51 changes: 44 additions & 7 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException
from ._checkpoint import CheckpointStorage
from ._events import (
OrchestrationComplete,
WorkflowEvent,
)
from ._message_utils import normalize_messages_input
Expand Down Expand Up @@ -449,6 +450,7 @@ def _convert_workflow_events_to_agent_response(
raw_representations: list[object] = []
merged_usage: UsageDetails | None = None
latest_created_at: str | None = None
orchestration_complete: OrchestrationComplete | None = None

for output_event in output_events:
if output_event.type == "request_info":
Expand All @@ -465,6 +467,12 @@ def _convert_workflow_events_to_agent_response(
raw_representations.append(output_event)
else:
data = output_event.data
if isinstance(data, OrchestrationComplete):
# Save for fallback: some orchestrations (e.g. group_chat) only emit
# OrchestrationComplete without separate per-agent output events.
orchestration_complete = data
continue

if isinstance(data, AgentResponseUpdate):
# We cannot support AgentResponseUpdate in non-streaming mode. This is because the message
# sequence cannot be guaranteed when there are streaming updates in between non-streaming
Expand All @@ -475,8 +483,10 @@ def _convert_workflow_events_to_agent_response(
)

if isinstance(data, AgentResponse):
messages.extend(data.messages)
raw_representations.append(data.raw_representation)
non_user_messages = [msg for msg in data.messages if msg.role != "user"]
messages.extend(non_user_messages)
if non_user_messages:
raw_representations.append(data.raw_representation)
merged_usage = add_usage_details(merged_usage, data.usage_details)
latest_created_at = (
data.created_at
Expand All @@ -486,12 +496,15 @@ def _convert_workflow_events_to_agent_response(
else latest_created_at
)
elif isinstance(data, Message):
messages.append(data)
raw_representations.append(data.raw_representation)
if data.role != "user":
messages.append(data)
raw_representations.append(data.raw_representation)
elif is_instance_of(data, list[Message]):
chat_messages = cast(list[Message], data)
messages.extend(chat_messages)
raw_representations.append(data)
non_user_messages = [msg for msg in chat_messages if msg.role != "user"]
messages.extend(non_user_messages)
if non_user_messages:
raw_representations.append(data)
else:
contents = self._extract_contents(data)
if not contents:
Expand All @@ -508,6 +521,15 @@ def _convert_workflow_events_to_agent_response(
)
raw_representations.append(data)

# Fallback: if no messages were collected from other output events but an
# OrchestrationComplete was emitted, use its messages. This covers orchestrations
# (e.g. group_chat) where the orchestrator runs agents internally and only
# emits OrchestrationComplete as a workflow output.
if not messages and orchestration_complete is not None:
messages.extend(orchestration_complete.messages)
if orchestration_complete.messages:
raw_representations.append(orchestration_complete)

return AgentResponse(
messages=messages,
response_id=response_id,
Expand Down Expand Up @@ -570,15 +592,26 @@ def _convert_workflow_event_to_agent_response_updates(
data = event.data
executor_id = event.executor_id

if isinstance(data, OrchestrationComplete):
# OrchestrationComplete carries the full conversation for direct
# workflow consumers but should not be included in streaming output.
return []

if isinstance(data, AgentResponseUpdate):
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor)
# Pass through AgentResponseUpdate directly (streaming from AgentExecutor).
# Filter user-role updates: orchestrations (e.g. HandoffBuilder) may emit the
# full conversation including user messages, which should not be echoed back.
if data.role == "user":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New early-return for AgentResponseUpdate with role='user' has no test coverage. An executor could emit an AgentResponseUpdate directly — add a test that yields one with role='user' and verifies it is dropped from the stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning early for user-role AgentResponseUpdate silently swallows the event. If a future workflow intentionally emits a user-role update (e.g., for a 'thinking' UI that replays the user prompt), this will be very hard to debug. Consider filtering at the handoff layer instead of the generic conversion layer, or at minimum add a comment explaining why user-role updates are universally suppressed.

return []
if not data.author_name:
data.author_name = executor_id
return [data]
if isinstance(data, AgentResponse):
# Convert each message in AgentResponse to an AgentResponseUpdate
updates: list[AgentResponseUpdate] = []
for msg in data.messages:
if msg.role == "user":
continue
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
Expand All @@ -593,6 +626,8 @@ def _convert_workflow_event_to_agent_response_updates(
)
return updates
if isinstance(data, Message):
if data.role == "user":
return []
return [
AgentResponseUpdate(
contents=list(data.contents),
Expand All @@ -609,6 +644,8 @@ def _convert_workflow_event_to_agent_response_updates(
chat_messages = cast(list[Message], data)
updates = []
for msg in chat_messages:
if msg.role == "user":
continue
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
Expand Down
21 changes: 21 additions & 0 deletions python/packages/core/agent_framework/_workflows/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@
DataT = TypeVar("DataT", default=Any)


@dataclass
class OrchestrationComplete:
"""Signals that an orchestration has finished and provides the full conversation.

Orchestrations yield this at termination so direct workflow consumers can access
the complete conversation history. When the workflow is used via ``WorkflowAgent``
(i.e. ``.as_agent()``), this event is filtered out and does not appear in the
``AgentResponse`` or ``AgentResponseUpdate`` output.
"""

messages: list[Any]
"""The full conversation messages accumulated during the orchestration."""

if sys.version_info >= (3, 13):
from typing import TypeVar # type: ignore # pragma: no cover
else:
from typing_extensions import TypeVar # type: ignore[import] # pragma: no cover

DataT = TypeVar("DataT", default=Any)


class WorkflowEventSource(str, Enum):
"""Identifies whether a workflow event came from the framework or an executor.

Expand Down
198 changes: 188 additions & 10 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Executor,
InMemoryHistoryProvider,
Message,
OrchestrationComplete,
ResponseStream,
SupportsAgentRun,
UsageDetails,
Expand Down Expand Up @@ -469,10 +470,11 @@ async def raw_yielding_executor(
assert updates[2].raw_representation.value == 42

async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None:
"""Test that yield_output with list[Message] extracts contents from all messages.
"""Test that yield_output with list[Message] extracts contents from non-user messages.

Note: Content items are coalesced by _finalize_response, so multiple text contents
become a single merged Content in the final response.
User-role messages are filtered out since agent responses should only contain
assistant/tool output. Content items are coalesced by _finalize_response, so
multiple text contents become a single merged Content in the final response.
"""

@executor
Expand All @@ -491,25 +493,201 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N
workflow = WorkflowBuilder(start_executor=list_yielding_executor).build()
agent = workflow.as_agent("list-msg-agent")

# Verify streaming returns the update with all 4 contents before coalescing
# Verify streaming returns updates for non-user messages only
updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

assert len(updates) == 3
assert len(updates) == 2
full_response = AgentResponse.from_updates(updates)
assert len(full_response.messages) == 3
assert len(full_response.messages) == 2
texts = [message.text for message in full_response.messages]
# Note: `from_agent_run_response_updates` coalesces multiple text contents into one content
assert texts == ["first message", "second message", "thirdfourth"]
# Note: AgentResponse.from_updates coalesces multiple text contents into a single merged Content
assert texts == ["second message", "thirdfourth"]

# Verify run()
result = await agent.run("test")

assert isinstance(result, AgentResponse)
assert len(result.messages) == 3
assert len(result.messages) == 2
texts = [message.text for message in result.messages]
assert texts == ["first message", "second message", "third fourth"]
assert texts == ["second message", "third fourth"]

async def test_workflow_as_agent_filters_user_role_agent_response_update(self) -> None:
"""Test that AgentResponseUpdate with role='user' is dropped from the stream."""

@executor
async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None:
# Emit a user-role AgentResponseUpdate directly
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="echoed user input")],
role="user",
author_name="test",
response_id="resp-1",
message_id="msg-1",
created_at="2026-01-01T00:00:00Z",
)
)
# Emit a valid assistant-role update
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="assistant reply")],
role="assistant",
author_name="test",
response_id="resp-1",
message_id="msg-2",
created_at="2026-01-01T00:00:00Z",
)
)

workflow = WorkflowBuilder(start_executor=user_update_executor).build()
agent = workflow.as_agent("user-update-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

# User-role update should be filtered out
assert len(updates) == 1
assert updates[0].role == "assistant"
assert updates[0].contents[0].text == "assistant reply"

async def test_workflow_as_agent_filters_orchestration_complete_streaming(self) -> None:
"""Test that OrchestrationComplete events are filtered out in streaming mode.

Orchestrations emit OrchestrationComplete at termination to provide the full
conversation for direct workflow consumers. The WorkflowAgent should filter
these out so they don't appear as AgentResponseUpdate output.
"""
msg_id_1 = str(uuid.uuid4())
msg_id_2 = str(uuid.uuid4())

@executor
async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None:
# Simulate streaming: emit individual AgentResponseUpdate messages
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="first reply")],
role="assistant",
author_name="agent-a",
response_id="resp-1",
message_id=msg_id_1,
created_at="2026-01-01T00:00:00Z",
)
)
await ctx.yield_output(
AgentResponseUpdate(
contents=[Content.from_text(text="second reply")],
role="assistant",
author_name="agent-b",
response_id="resp-2",
message_id=msg_id_2,
created_at="2026-01-01T00:00:01Z",
)
)
# Simulate termination: emit OrchestrationComplete with full conversation
await ctx.yield_output(
OrchestrationComplete([
Message(role="user", text="user input", message_id="user-msg-1"),
Message(role="assistant", text="first reply", message_id=msg_id_1),
Message(role="assistant", text="second reply", message_id=msg_id_2),
])
)

workflow = WorkflowBuilder(start_executor=dedup_executor).build()
agent = workflow.as_agent("dedup-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

# Should have exactly 2 assistant updates — OrchestrationComplete is filtered out
assert len(updates) == 2
assert updates[0].contents[0].text == "first reply"
assert updates[1].contents[0].text == "second reply"

async def test_workflow_as_agent_filters_orchestration_complete_non_streaming(self) -> None:
"""Test that OrchestrationComplete events are filtered out in non-streaming mode."""
msg_id_1 = str(uuid.uuid4())
msg_id_2 = str(uuid.uuid4())

@executor
async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None:
# Emit individual AgentResponse with messages
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="assistant", text="first reply", message_id=msg_id_1),
],
response_id="resp-1",
)
)
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="assistant", text="second reply", message_id=msg_id_2),
],
response_id="resp-2",
)
)
# Emit OrchestrationComplete at termination
await ctx.yield_output(
OrchestrationComplete([
Message(role="user", text="user input", message_id="user-msg-1"),
Message(role="assistant", text="first reply", message_id=msg_id_1),
Message(role="assistant", text="second reply", message_id=msg_id_2),
])
)

workflow = WorkflowBuilder(start_executor=dedup_executor).build()
agent = workflow.as_agent("dedup-agent")

result = await agent.run("test")

# Should have exactly 2 assistant messages — OrchestrationComplete is filtered out
assert len(result.messages) == 2
assert result.messages[0].text == "first reply"
assert result.messages[1].text == "second reply"

async def test_workflow_as_agent_agent_response_raw_repr_consistency(self) -> None:
"""Test that AgentResponse with only user messages does not add orphan raw_representations."""

@executor
async def mixed_response_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
# Emit an AgentResponse with only user messages
await ctx.yield_output(
AgentResponse(
messages=[Message(role="user", text="user only")],
response_id="resp-user-only",
)
)
# Emit an AgentResponse with mixed messages
await ctx.yield_output(
AgentResponse(
messages=[
Message(role="user", text="user msg"),
Message(role="assistant", text="assistant msg"),
],
response_id="resp-mixed",
)
)

workflow = WorkflowBuilder(start_executor=mixed_response_executor).build()
agent = workflow.as_agent("mixed-response-agent")

result = await agent.run("test")

# Only the assistant message from the mixed response should appear
assert len(result.messages) == 1
assert result.messages[0].text == "assistant msg"

# raw_representation should only contain the mixed response's raw_representation,
# not the user-only response's (which produced no output messages)
assert isinstance(result.raw_representation, list)
assert len(result.raw_representation) == 1

async def test_session_conversation_history_included_in_workflow_run(self) -> None:
"""Test that messages provided to agent.run() are passed through to the workflow."""
Expand Down
Loading