From d26fc2598781132dd631d0f10169219db6a968c5 Mon Sep 17 00:00:00 2001 From: alliscode Date: Wed, 4 Mar 2026 13:31:43 -0800 Subject: [PATCH 1/6] Filter user-role messages from WorkflowAgent response stream When WorkflowAgent converts workflow output events to AgentResponseUpdate (streaming) or AgentResponse (non-streaming), user-role messages are now filtered out. This prevents orchestration executors (e.g. HandoffBuilder) that yield the full conversation from echoing the user's input back as part of the agent's response. The fix applies to all code paths in _convert_workflow_event_to_agent_response_updates and _convert_workflow_events_to_agent_response that process Message or list[Message] data, consistently skipping messages with role='user'. Fixes https://github.com/microsoft/agent-framework/issues/3206 --- .../core/agent_framework/_workflows/_agent.py | 15 ++++++-- .../tests/workflow/test_workflow_agent.py | 19 +++++----- .../orchestrations/tests/test_handoff.py | 37 +++++++++++++++++++ 3 files changed, 58 insertions(+), 13 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 3fb83803c4..6d2dc2e440 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -462,7 +462,7 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - messages.extend(data.messages) + messages.extend(msg for msg in data.messages if msg.role != "user") raw_representations.append(data.raw_representation) merged_usage = add_usage_details(merged_usage, data.usage_details) latest_created_at = ( @@ -473,11 +473,12 @@ def _convert_workflow_events_to_agent_response( else latest_created_at ) elif isinstance(data, Message): - messages.append(data) - raw_representations.append(data.raw_representation) + if data.role != "user": + messages.append(data) + raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): chat_messages = cast(list[Message], data) - messages.extend(chat_messages) + messages.extend(msg for msg in chat_messages if msg.role != "user") raw_representations.append(data) else: contents = self._extract_contents(data) @@ -566,6 +567,8 @@ def _convert_workflow_event_to_agent_response_updates( # Convert each message in AgentResponse to an AgentResponseUpdate updates: list[AgentResponseUpdate] = [] for msg in data.messages: + if msg.role == "user": + continue updates.append( AgentResponseUpdate( contents=list(msg.contents), @@ -580,6 +583,8 @@ def _convert_workflow_event_to_agent_response_updates( ) return updates if isinstance(data, Message): + if data.role == "user": + return [] return [ AgentResponseUpdate( contents=list(data.contents), @@ -596,6 +601,8 @@ def _convert_workflow_event_to_agent_response_updates( chat_messages = cast(list[Message], data) updates = [] for msg in chat_messages: + if msg.role == "user": + continue updates.append( AgentResponseUpdate( contents=list(msg.contents), diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index b2fbded39b..1a0ece53f6 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -469,10 +469,11 @@ async def raw_yielding_executor( assert updates[2].raw_representation.value == 42 async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None: - """Test that yield_output with list[Message] extracts contents from all messages. + """Test that yield_output with list[Message] extracts contents from non-user messages. - Note: Content items are coalesced by _finalize_response, so multiple text contents - become a single merged Content in the final response. + User-role messages are filtered out since agent responses should only contain + assistant/tool output. Content items are coalesced by _finalize_response, so + multiple text contents become a single merged Content in the final response. """ @executor @@ -491,25 +492,25 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N workflow = WorkflowBuilder(start_executor=list_yielding_executor).build() agent = workflow.as_agent("list-msg-agent") - # Verify streaming returns the update with all 4 contents before coalescing + # Verify streaming returns updates for non-user messages only updates: list[AgentResponseUpdate] = [] async for update in agent.run("test", stream=True): updates.append(update) - assert len(updates) == 3 + assert len(updates) == 2 full_response = AgentResponse.from_updates(updates) - assert len(full_response.messages) == 3 + assert len(full_response.messages) == 2 texts = [message.text for message in full_response.messages] # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content - assert texts == ["first message", "second message", "thirdfourth"] + assert texts == ["second message", "thirdfourth"] # Verify run() result = await agent.run("test") assert isinstance(result, AgentResponse) - assert len(result.messages) == 3 + assert len(result.messages) == 2 texts = [message.text for message in result.messages] - assert texts == ["first message", "second message", "third fourth"] + assert texts == ["second message", "third fourth"] async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index e0d94355b6..ad67f5725f 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -8,6 +8,7 @@ import pytest from agent_framework import ( Agent, + AgentResponseUpdate, BaseContextProvider, ChatResponse, ChatResponseUpdate, @@ -1091,3 +1092,39 @@ def regular_tool() -> str: call_next.assert_awaited_once() assert context.result is None + + +async def test_handoff_as_agent_run_stream_does_not_echo_user_input() -> None: + """WorkflowAgent wrapping a handoff workflow must not echo user input in streamed updates. + + When HandoffAgentExecutor emits the full conversation via ctx.yield_output() on + termination, user-role messages from that list should not appear as + AgentResponseUpdate items in the stream returned by WorkflowAgent.run_stream(). + """ + agent = MockHandoffAgent(name="single_agent") + + workflow = ( + HandoffBuilder( + participants=[agent], + # Terminate immediately after the agent responds (user msg + assistant msg = 2). + termination_condition=lambda conv: len(conv) >= 2, + ) + .with_start_agent(agent) + .build() + ) + + workflow_agent = workflow.as_agent(name="test_workflow_agent") + + user_input = "Hi! Can you help me with something?" + updates: list[AgentResponseUpdate] = [] + async for update in workflow_agent.run(user_input, stream=True): + updates.append(update) + + assert updates, "Expected at least one streaming update" + + # The core assertion: no update should carry the user role. + user_role_updates = [u for u in updates if u.role == "user"] + assert not user_role_updates, ( + f"User input was echoed back in the stream as {len(user_role_updates)} update(s). " + "Expected only assistant-role updates." + ) From 574936291052fe5eef91c6ddf576b7557312e4cc Mon Sep 17 00:00:00 2001 From: alliscode Date: Wed, 4 Mar 2026 14:43:06 -0800 Subject: [PATCH 2/6] Address PR #4482 review comments: filter user-role AgentResponseUpdate and fix docs - Filter user-role AgentResponseUpdate in streaming pass-through path to prevent echoing user input when a provider emits updates with role='user' (missed code path from original fix) - Fix test comment: from_agent_run_response_updates -> AgentResponse.from_updates - Fix test docstring: WorkflowAgent.run_stream() -> WorkflowAgent.run(..., stream=True) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/core/agent_framework/_workflows/_agent.py | 2 ++ python/packages/core/tests/workflow/test_workflow_agent.py | 2 +- python/packages/orchestrations/tests/test_handoff.py | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 3d194104ac..9d102a36e8 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -573,6 +573,8 @@ def _convert_workflow_event_to_agent_response_updates( if isinstance(data, AgentResponseUpdate): # Pass through AgentResponseUpdate directly (streaming from AgentExecutor) + if data.role == "user": + return [] if not data.author_name: data.author_name = executor_id return [data] diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index 83d5c4af38..ce6807d225 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -501,7 +501,7 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N full_response = AgentResponse.from_updates(updates) assert len(full_response.messages) == 2 texts = [message.text for message in full_response.messages] - # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content + # Note: AgentResponse.from_updates coalesces multiple text contents into a single merged Content assert texts == ["second message", "thirdfourth"] # Verify run() diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index 7503953c08..4f6ce9469d 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -1125,7 +1125,7 @@ async def test_handoff_as_agent_run_stream_does_not_echo_user_input() -> None: When HandoffAgentExecutor emits the full conversation via ctx.yield_output() on termination, user-role messages from that list should not appear as - AgentResponseUpdate items in the stream returned by WorkflowAgent.run_stream(). + AgentResponseUpdate items in the stream returned by WorkflowAgent.run(..., stream=True). """ agent = MockHandoffAgent(name="single_agent") From 2ffaebcf113fdc3a0a3adfb9832b01e5a285e634 Mon Sep 17 00:00:00 2001 From: alliscode Date: Wed, 4 Mar 2026 17:44:11 -0800 Subject: [PATCH 3/6] Address PR #4482 round 2: fix raw_representations consistency, add tests - Fix raw_representations inconsistency in AgentResponse and list[Message] branches: skip appending raw_representation when all messages are user-role, consistent with the single Message branch behavior. - Add explanatory comment for user-role AgentResponseUpdate filtering. - Add test: AgentResponseUpdate with role=user is dropped from stream. - Add test: AgentResponse with user-only messages produces no orphan raw_representations. - Add non-streaming assertion to handoff echo test. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_workflows/_agent.py | 16 ++-- .../tests/workflow/test_workflow_agent.py | 79 +++++++++++++++++++ .../orchestrations/tests/test_handoff.py | 8 ++ 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 9d102a36e8..f25ed5c6f4 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -475,8 +475,10 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - messages.extend(msg for msg in data.messages if msg.role != "user") - raw_representations.append(data.raw_representation) + non_user_messages = [msg for msg in data.messages if msg.role != "user"] + messages.extend(non_user_messages) + if non_user_messages: + raw_representations.append(data.raw_representation) merged_usage = add_usage_details(merged_usage, data.usage_details) latest_created_at = ( data.created_at @@ -491,8 +493,10 @@ def _convert_workflow_events_to_agent_response( raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): chat_messages = cast(list[Message], data) - messages.extend(msg for msg in chat_messages if msg.role != "user") - raw_representations.append(data) + non_user_messages = [msg for msg in chat_messages if msg.role != "user"] + messages.extend(non_user_messages) + if non_user_messages: + raw_representations.append(data) else: contents = self._extract_contents(data) if not contents: @@ -572,7 +576,9 @@ def _convert_workflow_event_to_agent_response_updates( executor_id = event.executor_id if isinstance(data, AgentResponseUpdate): - # Pass through AgentResponseUpdate directly (streaming from AgentExecutor) + # Pass through AgentResponseUpdate directly (streaming from AgentExecutor). + # Filter user-role updates: orchestrations (e.g. HandoffBuilder) may emit the + # full conversation including user messages, which should not be echoed back. if data.role == "user": return [] if not data.author_name: diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index ce6807d225..fed72da11b 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -512,6 +512,85 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N texts = [message.text for message in result.messages] assert texts == ["second message", "third fourth"] + async def test_workflow_as_agent_filters_user_role_agent_response_update(self) -> None: + """Test that AgentResponseUpdate with role='user' is dropped from the stream.""" + + @executor + async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None: + # Emit a user-role AgentResponseUpdate directly + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="echoed user input")], + role="user", + author_name="test", + response_id="resp-1", + message_id="msg-1", + created_at="2026-01-01T00:00:00Z", + ) + ) + # Emit a valid assistant-role update + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="assistant reply")], + role="assistant", + author_name="test", + response_id="resp-1", + message_id="msg-2", + created_at="2026-01-01T00:00:00Z", + ) + ) + + workflow = WorkflowBuilder(start_executor=user_update_executor).build() + agent = workflow.as_agent("user-update-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + # User-role update should be filtered out + assert len(updates) == 1 + assert updates[0].role == "assistant" + assert updates[0].contents[0].text == "assistant reply" + + async def test_workflow_as_agent_agent_response_raw_repr_consistency(self) -> None: + """Test that AgentResponse with only user messages does not add orphan raw_representations.""" + + @executor + async def mixed_response_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + # Emit an AgentResponse with only user messages + await ctx.yield_output( + AgentResponse( + messages=[Message(role="user", text="user only")], + response_id="resp-user-only", + ) + ) + # Emit an AgentResponse with mixed messages + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="user", text="user msg"), + Message(role="assistant", text="assistant msg"), + ], + response_id="resp-mixed", + ) + ) + + workflow = WorkflowBuilder(start_executor=mixed_response_executor).build() + agent = workflow.as_agent("mixed-response-agent") + + result = await agent.run("test") + + # Only the assistant message from the mixed response should appear + assert len(result.messages) == 1 + assert result.messages[0].text == "assistant msg" + + # raw_representation should only contain the mixed response's raw_representation, + # not the user-only response's (which produced no output messages) + assert isinstance(result.raw_representation, list) + assert len(result.raw_representation) == 1 + async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" # Create an executor that captures all received messages diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index 4f6ce9469d..f2f255dc5a 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -1154,3 +1154,11 @@ async def test_handoff_as_agent_run_stream_does_not_echo_user_input() -> None: f"User input was echoed back in the stream as {len(user_role_updates)} update(s). " "Expected only assistant-role updates." ) + + # Also verify non-streaming path filters user messages + result = await workflow_agent.run(user_input) + user_role_messages = [m for m in result.messages if m.role == "user"] + assert not user_role_messages, ( + f"User input was echoed back in non-streaming result as {len(user_role_messages)} message(s). " + "Expected only assistant-role messages." + ) From 642dc7ee41e454749c51cd91e76b968a62d51f99 Mon Sep 17 00:00:00 2001 From: alliscode Date: Wed, 4 Mar 2026 18:09:42 -0800 Subject: [PATCH 4/6] Deduplicate messages re-emitted at orchestration termination Orchestrations like HandoffBuilder yield the full conversation at termination via ctx.yield_output(self._full_conversation), which re-emits all messages that were already streamed individually. This adds message_id-based deduplication in both streaming (_run_stream_impl) and non-streaming (_convert_workflow_events_to_agent_response) paths so duplicate messages are skipped. Fixes the conversation duplication issue identified in PR review. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/agent_framework/_workflows/_agent.py | 31 +++++- .../tests/workflow/test_workflow_agent.py | 94 +++++++++++++++++++ 2 files changed, 122 insertions(+), 3 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index f25ed5c6f4..f0a442f71d 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -328,11 +328,19 @@ async def _run_stream_impl( session_messages: list[Message] = session_context.get_messages(include_input=True) all_updates: list[AgentResponseUpdate] = [] + emitted_message_ids: set[str] = set() async for event in self._run_core( session_messages, checkpoint_id, checkpoint_storage, streaming=True, **kwargs ): updates = self._convert_workflow_event_to_agent_response_updates(response_id, event) for update in updates: + # Deduplicate: orchestrations (e.g. HandoffBuilder) may yield the full + # conversation at termination, re-emitting messages that were already + # streamed individually. Skip updates whose message_id was already sent. + if update.message_id and update.message_id in emitted_message_ids: + continue + if update.message_id: + emitted_message_ids.add(update.message_id) all_updates.append(update) yield update @@ -449,6 +457,7 @@ def _convert_workflow_events_to_agent_response( raw_representations: list[object] = [] merged_usage: UsageDetails | None = None latest_created_at: str | None = None + seen_message_ids: set[str] = set() for output_event in output_events: if output_event.type == "request_info": @@ -475,7 +484,14 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - non_user_messages = [msg for msg in data.messages if msg.role != "user"] + non_user_messages = [ + msg for msg in data.messages + if msg.role != "user" + and not (msg.message_id and msg.message_id in seen_message_ids) + ] + for msg in non_user_messages: + if msg.message_id: + seen_message_ids.add(msg.message_id) messages.extend(non_user_messages) if non_user_messages: raw_representations.append(data.raw_representation) @@ -488,12 +504,21 @@ def _convert_workflow_events_to_agent_response( else latest_created_at ) elif isinstance(data, Message): - if data.role != "user": + if data.role != "user" and not (data.message_id and data.message_id in seen_message_ids): + if data.message_id: + seen_message_ids.add(data.message_id) messages.append(data) raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): chat_messages = cast(list[Message], data) - non_user_messages = [msg for msg in chat_messages if msg.role != "user"] + non_user_messages = [ + msg for msg in chat_messages + if msg.role != "user" + and not (msg.message_id and msg.message_id in seen_message_ids) + ] + for msg in non_user_messages: + if msg.message_id: + seen_message_ids.add(msg.message_id) messages.extend(non_user_messages) if non_user_messages: raw_representations.append(data) diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index fed72da11b..a9cf884ef2 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -552,6 +552,100 @@ async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Nev assert updates[0].role == "assistant" assert updates[0].contents[0].text == "assistant reply" + async def test_workflow_as_agent_deduplicates_streaming_messages(self) -> None: + """Test that duplicate messages are deduplicated in streaming mode. + + Orchestrations like HandoffBuilder emit messages individually during streaming + then re-emit the full conversation at termination. The WorkflowAgent should + deduplicate so the caller doesn't see repeated messages. + """ + msg_id_1 = str(uuid.uuid4()) + msg_id_2 = str(uuid.uuid4()) + + @executor + async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None: + # Simulate streaming: emit individual AgentResponseUpdate messages + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="first reply")], + role="assistant", + author_name="agent-a", + response_id="resp-1", + message_id=msg_id_1, + created_at="2026-01-01T00:00:00Z", + ) + ) + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="second reply")], + role="assistant", + author_name="agent-b", + response_id="resp-2", + message_id=msg_id_2, + created_at="2026-01-01T00:00:01Z", + ) + ) + # Simulate termination: re-emit the full conversation as list[Message] + # (this is what HandoffBuilder._check_terminate_and_yield does) + await ctx.yield_output([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + + workflow = WorkflowBuilder(start_executor=dedup_executor).build() + agent = workflow.as_agent("dedup-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + # Should have exactly 2 assistant updates — no duplicates from the list[Message] yield + assert len(updates) == 2 + assert updates[0].contents[0].text == "first reply" + assert updates[1].contents[0].text == "second reply" + + async def test_workflow_as_agent_deduplicates_non_streaming_messages(self) -> None: + """Test that duplicate messages are deduplicated in non-streaming mode.""" + msg_id_1 = str(uuid.uuid4()) + msg_id_2 = str(uuid.uuid4()) + + @executor + async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None: + # Emit individual AgentResponse with messages + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="assistant", text="first reply", message_id=msg_id_1), + ], + response_id="resp-1", + ) + ) + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="assistant", text="second reply", message_id=msg_id_2), + ], + response_id="resp-2", + ) + ) + # Re-emit the full conversation at termination + await ctx.yield_output([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + + workflow = WorkflowBuilder(start_executor=dedup_executor).build() + agent = workflow.as_agent("dedup-agent") + + result = await agent.run("test") + + # Should have exactly 2 assistant messages — no duplicates from the list[Message] yield + assert len(result.messages) == 2 + assert result.messages[0].text == "first reply" + assert result.messages[1].text == "second reply" + async def test_workflow_as_agent_agent_response_raw_repr_consistency(self) -> None: """Test that AgentResponse with only user messages does not add orphan raw_representations.""" From fb4c61d647c3da286d1433cd489a5ef989f994ea Mon Sep 17 00:00:00 2001 From: alliscode Date: Thu, 5 Mar 2026 13:46:52 -0800 Subject: [PATCH 5/6] Introduce OrchestrationComplete event type for termination yields Replace raw list[Message] yields at orchestration termination with a new OrchestrationComplete wrapper type. This cleanly separates the full conversation (for direct workflow consumers) from messages that should appear in AgentResponse/AgentResponseUpdate output. Changes: - Add OrchestrationComplete dataclass in _events.py - Update handoff, group_chat, base_group_chat_orchestrator, and sequential to yield OrchestrationComplete instead of list[Message] - WorkflowAgent streaming converter skips OrchestrationComplete entirely - WorkflowAgent non-streaming converter uses OrchestrationComplete as fallback only when no other messages were collected (handles group_chat where orchestrator runs agents internally) - Remove message_id-based dedup logic (no longer needed) - Update all tests to use OrchestrationComplete Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/__init__.py | 2 + .../core/agent_framework/_workflows/_agent.py | 54 +++++++++---------- .../agent_framework/_workflows/_events.py | 21 ++++++++ .../tests/workflow/test_workflow_agent.py | 48 +++++++++-------- .../__init__.py | 4 ++ .../_base_group_chat_orchestrator.py | 10 ++-- .../_group_chat.py | 3 +- .../_handoff.py | 10 ++-- .../_sequential.py | 9 ++-- .../orchestrations/tests/test_group_chat.py | 53 +++++++++--------- .../orchestrations/tests/test_handoff.py | 11 ++-- .../orchestrations/tests/test_sequential.py | 29 +++++----- 12 files changed, 145 insertions(+), 109 deletions(-) diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 1cbcc7a8cb..2e06d26803 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -144,6 +144,7 @@ ) from ._workflows._edge_runner import create_edge_runner from ._workflows._events import ( + OrchestrationComplete, WorkflowErrorDetails, WorkflowEvent, WorkflowEventSource, @@ -261,6 +262,7 @@ "MiddlewareTermination", "MiddlewareType", "MiddlewareTypes", + "OrchestrationComplete", "OuterFinalT", "OuterUpdateT", "RawAgent", diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index f0a442f71d..8a492aa31b 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -32,6 +32,7 @@ from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException from ._checkpoint import CheckpointStorage from ._events import ( + OrchestrationComplete, WorkflowEvent, ) from ._message_utils import normalize_messages_input @@ -328,19 +329,11 @@ async def _run_stream_impl( session_messages: list[Message] = session_context.get_messages(include_input=True) all_updates: list[AgentResponseUpdate] = [] - emitted_message_ids: set[str] = set() async for event in self._run_core( session_messages, checkpoint_id, checkpoint_storage, streaming=True, **kwargs ): updates = self._convert_workflow_event_to_agent_response_updates(response_id, event) for update in updates: - # Deduplicate: orchestrations (e.g. HandoffBuilder) may yield the full - # conversation at termination, re-emitting messages that were already - # streamed individually. Skip updates whose message_id was already sent. - if update.message_id and update.message_id in emitted_message_ids: - continue - if update.message_id: - emitted_message_ids.add(update.message_id) all_updates.append(update) yield update @@ -457,7 +450,7 @@ def _convert_workflow_events_to_agent_response( raw_representations: list[object] = [] merged_usage: UsageDetails | None = None latest_created_at: str | None = None - seen_message_ids: set[str] = set() + orchestration_complete: OrchestrationComplete | None = None for output_event in output_events: if output_event.type == "request_info": @@ -474,6 +467,12 @@ def _convert_workflow_events_to_agent_response( raw_representations.append(output_event) else: data = output_event.data + if isinstance(data, OrchestrationComplete): + # Save for fallback: some orchestrations (e.g. group_chat) only emit + # OrchestrationComplete without separate per-agent output events. + orchestration_complete = data + continue + if isinstance(data, AgentResponseUpdate): # We cannot support AgentResponseUpdate in non-streaming mode. This is because the message # sequence cannot be guaranteed when there are streaming updates in between non-streaming @@ -484,14 +483,7 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - non_user_messages = [ - msg for msg in data.messages - if msg.role != "user" - and not (msg.message_id and msg.message_id in seen_message_ids) - ] - for msg in non_user_messages: - if msg.message_id: - seen_message_ids.add(msg.message_id) + non_user_messages = [msg for msg in data.messages if msg.role != "user"] messages.extend(non_user_messages) if non_user_messages: raw_representations.append(data.raw_representation) @@ -504,21 +496,12 @@ def _convert_workflow_events_to_agent_response( else latest_created_at ) elif isinstance(data, Message): - if data.role != "user" and not (data.message_id and data.message_id in seen_message_ids): - if data.message_id: - seen_message_ids.add(data.message_id) + if data.role != "user": messages.append(data) raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): chat_messages = cast(list[Message], data) - non_user_messages = [ - msg for msg in chat_messages - if msg.role != "user" - and not (msg.message_id and msg.message_id in seen_message_ids) - ] - for msg in non_user_messages: - if msg.message_id: - seen_message_ids.add(msg.message_id) + non_user_messages = [msg for msg in chat_messages if msg.role != "user"] messages.extend(non_user_messages) if non_user_messages: raw_representations.append(data) @@ -538,6 +521,16 @@ def _convert_workflow_events_to_agent_response( ) raw_representations.append(data) + # Fallback: if no messages were collected from other output events but an + # OrchestrationComplete was emitted, use its messages. This covers orchestrations + # (e.g. group_chat) where the orchestrator runs agents internally and only + # emits OrchestrationComplete as a workflow output. + if not messages and orchestration_complete is not None: + non_user_messages = [msg for msg in orchestration_complete.messages if msg.role != "user"] + messages.extend(non_user_messages) + if non_user_messages: + raw_representations.append(orchestration_complete) + return AgentResponse( messages=messages, response_id=response_id, @@ -600,6 +593,11 @@ def _convert_workflow_event_to_agent_response_updates( data = event.data executor_id = event.executor_id + if isinstance(data, OrchestrationComplete): + # OrchestrationComplete carries the full conversation for direct + # workflow consumers but should not be included in streaming output. + return [] + if isinstance(data, AgentResponseUpdate): # Pass through AgentResponseUpdate directly (streaming from AgentExecutor). # Filter user-role updates: orchestrations (e.g. HandoffBuilder) may emit the diff --git a/python/packages/core/agent_framework/_workflows/_events.py b/python/packages/core/agent_framework/_workflows/_events.py index c4694bf31b..d6740b5783 100644 --- a/python/packages/core/agent_framework/_workflows/_events.py +++ b/python/packages/core/agent_framework/_workflows/_events.py @@ -22,6 +22,27 @@ DataT = TypeVar("DataT", default=Any) +@dataclass +class OrchestrationComplete: + """Signals that an orchestration has finished and provides the full conversation. + + Orchestrations yield this at termination so direct workflow consumers can access + the complete conversation history. When the workflow is used via ``WorkflowAgent`` + (i.e. ``.as_agent()``), this event is filtered out and does not appear in the + ``AgentResponse`` or ``AgentResponseUpdate`` output. + """ + + messages: list[Any] + """The full conversation messages accumulated during the orchestration.""" + +if sys.version_info >= (3, 13): + from typing import TypeVar # type: ignore # pragma: no cover +else: + from typing_extensions import TypeVar # type: ignore[import] # pragma: no cover + +DataT = TypeVar("DataT", default=Any) + + class WorkflowEventSource(str, Enum): """Identifies whether a workflow event came from the framework or an executor. diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index a9cf884ef2..fffe19d342 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -16,6 +16,7 @@ Executor, InMemoryHistoryProvider, Message, + OrchestrationComplete, ResponseStream, SupportsAgentRun, UsageDetails, @@ -552,12 +553,12 @@ async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Nev assert updates[0].role == "assistant" assert updates[0].contents[0].text == "assistant reply" - async def test_workflow_as_agent_deduplicates_streaming_messages(self) -> None: - """Test that duplicate messages are deduplicated in streaming mode. + async def test_workflow_as_agent_filters_orchestration_complete_streaming(self) -> None: + """Test that OrchestrationComplete events are filtered out in streaming mode. - Orchestrations like HandoffBuilder emit messages individually during streaming - then re-emit the full conversation at termination. The WorkflowAgent should - deduplicate so the caller doesn't see repeated messages. + Orchestrations emit OrchestrationComplete at termination to provide the full + conversation for direct workflow consumers. The WorkflowAgent should filter + these out so they don't appear as AgentResponseUpdate output. """ msg_id_1 = str(uuid.uuid4()) msg_id_2 = str(uuid.uuid4()) @@ -585,13 +586,14 @@ async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, An created_at="2026-01-01T00:00:01Z", ) ) - # Simulate termination: re-emit the full conversation as list[Message] - # (this is what HandoffBuilder._check_terminate_and_yield does) - await ctx.yield_output([ - Message(role="user", text="user input", message_id="user-msg-1"), - Message(role="assistant", text="first reply", message_id=msg_id_1), - Message(role="assistant", text="second reply", message_id=msg_id_2), - ]) + # Simulate termination: emit OrchestrationComplete with full conversation + await ctx.yield_output( + OrchestrationComplete([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + ) workflow = WorkflowBuilder(start_executor=dedup_executor).build() agent = workflow.as_agent("dedup-agent") @@ -600,13 +602,13 @@ async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, An async for update in agent.run("test", stream=True): updates.append(update) - # Should have exactly 2 assistant updates — no duplicates from the list[Message] yield + # Should have exactly 2 assistant updates — OrchestrationComplete is filtered out assert len(updates) == 2 assert updates[0].contents[0].text == "first reply" assert updates[1].contents[0].text == "second reply" - async def test_workflow_as_agent_deduplicates_non_streaming_messages(self) -> None: - """Test that duplicate messages are deduplicated in non-streaming mode.""" + async def test_workflow_as_agent_filters_orchestration_complete_non_streaming(self) -> None: + """Test that OrchestrationComplete events are filtered out in non-streaming mode.""" msg_id_1 = str(uuid.uuid4()) msg_id_2 = str(uuid.uuid4()) @@ -629,19 +631,21 @@ async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, An response_id="resp-2", ) ) - # Re-emit the full conversation at termination - await ctx.yield_output([ - Message(role="user", text="user input", message_id="user-msg-1"), - Message(role="assistant", text="first reply", message_id=msg_id_1), - Message(role="assistant", text="second reply", message_id=msg_id_2), - ]) + # Emit OrchestrationComplete at termination + await ctx.yield_output( + OrchestrationComplete([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + ) workflow = WorkflowBuilder(start_executor=dedup_executor).build() agent = workflow.as_agent("dedup-agent") result = await agent.run("test") - # Should have exactly 2 assistant messages — no duplicates from the list[Message] yield + # Should have exactly 2 assistant messages — OrchestrationComplete is filtered out assert len(result.messages) == 2 assert result.messages[0].text == "first reply" assert result.messages[1].text == "second reply" diff --git a/python/packages/orchestrations/agent_framework_orchestrations/__init__.py b/python/packages/orchestrations/agent_framework_orchestrations/__init__.py index d1acb7af53..95b6e7e1e9 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/__init__.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/__init__.py @@ -65,6 +65,9 @@ from ._orchestrator_helpers import clean_conversation_for_handoff, create_completion_message from ._sequential import SequentialBuilder +# Re-export OrchestrationComplete from core for convenience +from agent_framework import OrchestrationComplete + __all__ = [ "MAGENTIC_MANAGER_NAME", "ORCH_MSG_KIND_INSTRUCTION", @@ -101,6 +104,7 @@ "MagenticProgressLedgerItem", "MagenticResetSignal", "OrchestrationState", + "OrchestrationComplete", "SequentialBuilder", "StandardMagenticManager", "TerminationCondition", diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py index f01f3700f7..58786a2589 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py @@ -14,7 +14,7 @@ from agent_framework._types import Message from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse -from agent_framework._workflows._events import WorkflowEvent +from agent_framework._workflows._events import OrchestrationComplete, WorkflowEvent from agent_framework._workflows._executor import Executor, handler from agent_framework._workflows._workflow_context import WorkflowContext from typing_extensions import Never @@ -351,7 +351,7 @@ async def _check_termination(self) -> bool: result = await result return result - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check termination conditions and yield completion if met. Args: @@ -363,7 +363,7 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess terminate = await self._check_termination() if terminate: self._append_messages([self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False @@ -490,7 +490,7 @@ def _check_round_limit(self) -> bool: return False - async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check round limit and yield completion if reached. Args: @@ -502,7 +502,7 @@ async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Me reach_max_rounds = self._check_round_limit() if reach_max_rounds: self._append_messages([self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py index a99e221409..2bc3bcca9c 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py @@ -33,6 +33,7 @@ from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage +from agent_framework._workflows._events import OrchestrationComplete from agent_framework._workflows._executor import Executor from agent_framework._workflows._workflow import Workflow from agent_framework._workflows._workflow_builder import WorkflowBuilder @@ -537,7 +538,7 @@ async def _check_agent_terminate_and_yield( agent_orchestration_output.final_message or "The conversation has been terminated by the agent." ) self._append_messages([self._create_completion_message(final_message)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index 5d6e84ef05..3066fa9c39 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -45,7 +45,7 @@ from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage -from agent_framework._workflows._events import WorkflowEvent +from agent_framework._workflows._events import OrchestrationComplete, WorkflowEvent from agent_framework._workflows._request_info_mixin import response_handler from agent_framework._workflows._workflow import Workflow from agent_framework._workflows._workflow_builder import WorkflowBuilder @@ -588,7 +588,9 @@ async def handle_response( If the response is empty, it indicates termination of the handoff workflow. """ if not response: - await cast(WorkflowContext[Never, list[Message]], ctx).yield_output(self._full_conversation) + await cast(WorkflowContext[Never, OrchestrationComplete], ctx).yield_output( + OrchestrationComplete(self._full_conversation) + ) return # Broadcast the user response to all other agents @@ -647,7 +649,7 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None: return None - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check termination conditions and yield completion if met. Args: @@ -664,7 +666,7 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess terminated = await terminated if terminated: - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py index 5ef4f7fe8c..e70b62a9d8 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py @@ -47,6 +47,7 @@ ) from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage +from agent_framework._workflows._events import OrchestrationComplete from agent_framework._workflows._executor import ( Executor, handler, @@ -84,25 +85,25 @@ class _EndWithConversation(Executor): async def end_with_messages( self, conversation: list[Message], - ctx: WorkflowContext[Any, list[Message]], + ctx: WorkflowContext[Any, OrchestrationComplete], ) -> None: """Handler for ending with a list of Message. This is used when the last participant is a custom executor. """ - await ctx.yield_output(list(conversation)) + await ctx.yield_output(OrchestrationComplete(list(conversation))) @handler async def end_with_agent_executor_response( self, response: AgentExecutorResponse, - ctx: WorkflowContext[Any, list[Message] | None], + ctx: WorkflowContext[Any, OrchestrationComplete], ) -> None: """Handle case where last participant is an agent. The agent is wrapped by AgentExecutor and emits AgentExecutorResponse. """ - await ctx.yield_output(response.full_conversation) + await ctx.yield_output(OrchestrationComplete(response.full_conversation)) class SequentialBuilder: diff --git a/python/packages/orchestrations/tests/test_group_chat.py b/python/packages/orchestrations/tests/test_group_chat.py index 7550f820c7..b67f5645f9 100644 --- a/python/packages/orchestrations/tests/test_group_chat.py +++ b/python/packages/orchestrations/tests/test_group_chat.py @@ -15,6 +15,7 @@ ChatResponseUpdate, Content, Message, + OrchestrationComplete, WorkflowEvent, WorkflowRunState, ) @@ -234,8 +235,8 @@ async def test_group_chat_builder_basic_flow() -> None: async for event in workflow.run("coordinate task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 assert len(outputs[0]) >= 1 @@ -279,8 +280,8 @@ async def test_agent_manager_handles_concatenated_json_output() -> None: async for event in workflow.run("coordinate task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs conversation = outputs[-1] @@ -396,8 +397,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have terminated due to max_rounds, expect at least one output assert len(outputs) >= 1 @@ -429,8 +430,8 @@ def termination_condition(conversation: list[Message]) -> bool: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs, "Expected termination to yield output" conversation = outputs[-1] @@ -455,8 +456,8 @@ async def test_termination_condition_agent_manager_finalizes(self) -> None: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs, "Expected termination to yield output" conversation = outputs[-1] @@ -501,8 +502,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 # Should complete normally @@ -542,8 +543,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test string", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -565,8 +566,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run(task_message, stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -591,8 +592,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run(conversation, stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -621,8 +622,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have at least one output (the round limit message) assert len(outputs) >= 1 @@ -654,8 +655,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have at least one output (the round limit message) assert len(outputs) >= 1 @@ -679,7 +680,7 @@ async def test_group_chat_checkpoint_runtime_only() -> None: baseline_output: list[Message] | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + baseline_output = ev.data.messages if isinstance(ev.data, OrchestrationComplete) else None # type: ignore if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -715,7 +716,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: baseline_output: list[Message] | None = None async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True): if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + baseline_output = ev.data.messages if isinstance(ev.data, OrchestrationComplete) else None # type: ignore if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -964,10 +965,10 @@ def agent_factory() -> Agent: assert len(outputs) == 1 # The DynamicManagerAgent terminates after second call with final_message final_messages = outputs[0].data - assert isinstance(final_messages, list) + assert isinstance(final_messages, OrchestrationComplete) assert any( msg.text == "dynamic manager final" - for msg in cast(list[Message], final_messages) + for msg in final_messages.messages if msg.author_name == "dynamic_manager" ) diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index f2f255dc5a..3f6be4e572 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -14,6 +14,7 @@ ChatResponseUpdate, Content, Message, + OrchestrationComplete, ResponseStream, WorkflowEvent, resolve_agent_id, @@ -828,8 +829,8 @@ async def test_autonomous_mode_yields_output_without_user_request(): assert outputs, "Autonomous mode should yield a workflow output" final_conversation = outputs[-1].data - assert isinstance(final_conversation, list) - conversation_list = cast(list[Message], final_conversation) + assert isinstance(final_conversation, OrchestrationComplete) + conversation_list = final_conversation.messages assert any(msg.role == "assistant" and (msg.text or "").startswith("specialist reply") for msg in conversation_list) @@ -898,8 +899,8 @@ async def async_termination(conv: list[Message]) -> bool: assert len(outputs) == 1 final_conversation = outputs[0].data - assert isinstance(final_conversation, list) - final_conv_list = cast(list[Message], final_conversation) + assert isinstance(final_conversation, OrchestrationComplete) + final_conv_list = final_conversation.messages user_messages = [msg for msg in final_conv_list if msg.role == "user"] assert len(user_messages) == 2 assert termination_call_count > 0 @@ -956,7 +957,7 @@ async def _get() -> ChatResponse: outputs = [event for event in events if event.type == "output"] assert outputs - conversation_outputs = [event for event in outputs if isinstance(event.data, list)] + conversation_outputs = [event for event in outputs if isinstance(event.data, OrchestrationComplete)] assert len(conversation_outputs) == 1 diff --git a/python/packages/orchestrations/tests/test_sequential.py b/python/packages/orchestrations/tests/test_sequential.py index 67bcc1bb9e..33bdf091d7 100644 --- a/python/packages/orchestrations/tests/test_sequential.py +++ b/python/packages/orchestrations/tests/test_sequential.py @@ -13,6 +13,7 @@ Content, Executor, Message, + OrchestrationComplete, TypeCompatibilityError, WorkflowContext, WorkflowRunState, @@ -84,7 +85,7 @@ async def test_sequential_agents_append_to_context() -> None: wf = SequentialBuilder(participants=[a1, a2]).build() completed = False - output: list[Message] | None = None + output: OrchestrationComplete | None = None async for ev in wf.run("hello sequential", stream=True): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True @@ -95,8 +96,8 @@ async def test_sequential_agents_append_to_context() -> None: assert completed assert output is not None - assert isinstance(output, list) - msgs: list[Message] = output + assert isinstance(output, OrchestrationComplete) + msgs: list[Message] = output.messages assert len(msgs) == 3 assert msgs[0].role == "user" and "hello sequential" in msgs[0].text assert msgs[1].role == "assistant" and (msgs[1].author_name == "A1" or True) @@ -112,7 +113,7 @@ async def test_sequential_with_custom_executor_summary() -> None: wf = SequentialBuilder(participants=[a1, summarizer]).build() completed = False - output: list[Message] | None = None + output: OrchestrationComplete | None = None async for ev in wf.run("topic X", stream=True): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True @@ -123,7 +124,7 @@ async def test_sequential_with_custom_executor_summary() -> None: assert completed assert output is not None - msgs: list[Message] = output + msgs: list[Message] = output.messages # Expect: [user, A1 reply, summary] assert len(msgs) == 3 assert msgs[0].role == "user" @@ -137,7 +138,7 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: initial_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(initial_agents), checkpoint_storage=storage).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("checkpoint sequential", stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -154,7 +155,7 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents), checkpoint_storage=storage).build() - resumed_output: list[Message] | None = None + resumed_output: OrchestrationComplete | None = None async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True): if ev.type == "output": resumed_output = ev.data # type: ignore[assignment] @@ -165,8 +166,8 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] + assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages] async def test_sequential_checkpoint_runtime_only() -> None: @@ -176,7 +177,7 @@ async def test_sequential_checkpoint_runtime_only() -> None: agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(agents)).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -193,7 +194,7 @@ async def test_sequential_checkpoint_runtime_only() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents)).build() - resumed_output: list[Message] | None = None + resumed_output: OrchestrationComplete | None = None async for ev in wf_resume.run( checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True ): @@ -206,8 +207,8 @@ async def test_sequential_checkpoint_runtime_only() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] + assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages] async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: @@ -223,7 +224,7 @@ async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(agents), checkpoint_storage=buildtime_storage).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] From c53b2013a2f2174e6d5ff86f25f5bdea41143c47 Mon Sep 17 00:00:00 2001 From: alliscode Date: Thu, 5 Mar 2026 14:15:40 -0800 Subject: [PATCH 6/6] fix: preserve full conversation from OrchestrationComplete fallback Remove user-message filtering from the OrchestrationComplete fallback path so the full conversation (including inter-agent user messages) is retained in the AgentResponse. --- python/packages/core/agent_framework/_workflows/_agent.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 8a492aa31b..0e19c9140e 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -526,9 +526,8 @@ def _convert_workflow_events_to_agent_response( # (e.g. group_chat) where the orchestrator runs agents internally and only # emits OrchestrationComplete as a workflow output. if not messages and orchestration_complete is not None: - non_user_messages = [msg for msg in orchestration_complete.messages if msg.role != "user"] - messages.extend(non_user_messages) - if non_user_messages: + messages.extend(orchestration_complete.messages) + if orchestration_complete.messages: raw_representations.append(orchestration_complete) return AgentResponse(