diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 1cbcc7a8cb..2e06d26803 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -144,6 +144,7 @@ ) from ._workflows._edge_runner import create_edge_runner from ._workflows._events import ( + OrchestrationComplete, WorkflowErrorDetails, WorkflowEvent, WorkflowEventSource, @@ -261,6 +262,7 @@ "MiddlewareTermination", "MiddlewareType", "MiddlewareTypes", + "OrchestrationComplete", "OuterFinalT", "OuterUpdateT", "RawAgent", diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index bf615814b3..0e19c9140e 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -32,6 +32,7 @@ from ..exceptions import AgentInvalidRequestException, AgentInvalidResponseException from ._checkpoint import CheckpointStorage from ._events import ( + OrchestrationComplete, WorkflowEvent, ) from ._message_utils import normalize_messages_input @@ -449,6 +450,7 @@ def _convert_workflow_events_to_agent_response( raw_representations: list[object] = [] merged_usage: UsageDetails | None = None latest_created_at: str | None = None + orchestration_complete: OrchestrationComplete | None = None for output_event in output_events: if output_event.type == "request_info": @@ -465,6 +467,12 @@ def _convert_workflow_events_to_agent_response( raw_representations.append(output_event) else: data = output_event.data + if isinstance(data, OrchestrationComplete): + # Save for fallback: some orchestrations (e.g. group_chat) only emit + # OrchestrationComplete without separate per-agent output events. + orchestration_complete = data + continue + if isinstance(data, AgentResponseUpdate): # We cannot support AgentResponseUpdate in non-streaming mode. This is because the message # sequence cannot be guaranteed when there are streaming updates in between non-streaming @@ -475,8 +483,10 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - messages.extend(data.messages) - raw_representations.append(data.raw_representation) + non_user_messages = [msg for msg in data.messages if msg.role != "user"] + messages.extend(non_user_messages) + if non_user_messages: + raw_representations.append(data.raw_representation) merged_usage = add_usage_details(merged_usage, data.usage_details) latest_created_at = ( data.created_at @@ -486,12 +496,15 @@ def _convert_workflow_events_to_agent_response( else latest_created_at ) elif isinstance(data, Message): - messages.append(data) - raw_representations.append(data.raw_representation) + if data.role != "user": + messages.append(data) + raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): chat_messages = cast(list[Message], data) - messages.extend(chat_messages) - raw_representations.append(data) + non_user_messages = [msg for msg in chat_messages if msg.role != "user"] + messages.extend(non_user_messages) + if non_user_messages: + raw_representations.append(data) else: contents = self._extract_contents(data) if not contents: @@ -508,6 +521,15 @@ def _convert_workflow_events_to_agent_response( ) raw_representations.append(data) + # Fallback: if no messages were collected from other output events but an + # OrchestrationComplete was emitted, use its messages. This covers orchestrations + # (e.g. group_chat) where the orchestrator runs agents internally and only + # emits OrchestrationComplete as a workflow output. + if not messages and orchestration_complete is not None: + messages.extend(orchestration_complete.messages) + if orchestration_complete.messages: + raw_representations.append(orchestration_complete) + return AgentResponse( messages=messages, response_id=response_id, @@ -570,8 +592,17 @@ def _convert_workflow_event_to_agent_response_updates( data = event.data executor_id = event.executor_id + if isinstance(data, OrchestrationComplete): + # OrchestrationComplete carries the full conversation for direct + # workflow consumers but should not be included in streaming output. + return [] + if isinstance(data, AgentResponseUpdate): - # Pass through AgentResponseUpdate directly (streaming from AgentExecutor) + # Pass through AgentResponseUpdate directly (streaming from AgentExecutor). + # Filter user-role updates: orchestrations (e.g. HandoffBuilder) may emit the + # full conversation including user messages, which should not be echoed back. + if data.role == "user": + return [] if not data.author_name: data.author_name = executor_id return [data] @@ -579,6 +610,8 @@ def _convert_workflow_event_to_agent_response_updates( # Convert each message in AgentResponse to an AgentResponseUpdate updates: list[AgentResponseUpdate] = [] for msg in data.messages: + if msg.role == "user": + continue updates.append( AgentResponseUpdate( contents=list(msg.contents), @@ -593,6 +626,8 @@ def _convert_workflow_event_to_agent_response_updates( ) return updates if isinstance(data, Message): + if data.role == "user": + return [] return [ AgentResponseUpdate( contents=list(data.contents), @@ -609,6 +644,8 @@ def _convert_workflow_event_to_agent_response_updates( chat_messages = cast(list[Message], data) updates = [] for msg in chat_messages: + if msg.role == "user": + continue updates.append( AgentResponseUpdate( contents=list(msg.contents), diff --git a/python/packages/core/agent_framework/_workflows/_events.py b/python/packages/core/agent_framework/_workflows/_events.py index c4694bf31b..d6740b5783 100644 --- a/python/packages/core/agent_framework/_workflows/_events.py +++ b/python/packages/core/agent_framework/_workflows/_events.py @@ -22,6 +22,27 @@ DataT = TypeVar("DataT", default=Any) +@dataclass +class OrchestrationComplete: + """Signals that an orchestration has finished and provides the full conversation. + + Orchestrations yield this at termination so direct workflow consumers can access + the complete conversation history. When the workflow is used via ``WorkflowAgent`` + (i.e. ``.as_agent()``), this event is filtered out and does not appear in the + ``AgentResponse`` or ``AgentResponseUpdate`` output. + """ + + messages: list[Any] + """The full conversation messages accumulated during the orchestration.""" + +if sys.version_info >= (3, 13): + from typing import TypeVar # type: ignore # pragma: no cover +else: + from typing_extensions import TypeVar # type: ignore[import] # pragma: no cover + +DataT = TypeVar("DataT", default=Any) + + class WorkflowEventSource(str, Enum): """Identifies whether a workflow event came from the framework or an executor. diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index b5a8bb9902..fffe19d342 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -16,6 +16,7 @@ Executor, InMemoryHistoryProvider, Message, + OrchestrationComplete, ResponseStream, SupportsAgentRun, UsageDetails, @@ -469,10 +470,11 @@ async def raw_yielding_executor( assert updates[2].raw_representation.value == 42 async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None: - """Test that yield_output with list[Message] extracts contents from all messages. + """Test that yield_output with list[Message] extracts contents from non-user messages. - Note: Content items are coalesced by _finalize_response, so multiple text contents - become a single merged Content in the final response. + User-role messages are filtered out since agent responses should only contain + assistant/tool output. Content items are coalesced by _finalize_response, so + multiple text contents become a single merged Content in the final response. """ @executor @@ -491,25 +493,201 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N workflow = WorkflowBuilder(start_executor=list_yielding_executor).build() agent = workflow.as_agent("list-msg-agent") - # Verify streaming returns the update with all 4 contents before coalescing + # Verify streaming returns updates for non-user messages only updates: list[AgentResponseUpdate] = [] async for update in agent.run("test", stream=True): updates.append(update) - assert len(updates) == 3 + assert len(updates) == 2 full_response = AgentResponse.from_updates(updates) - assert len(full_response.messages) == 3 + assert len(full_response.messages) == 2 texts = [message.text for message in full_response.messages] - # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content - assert texts == ["first message", "second message", "thirdfourth"] + # Note: AgentResponse.from_updates coalesces multiple text contents into a single merged Content + assert texts == ["second message", "thirdfourth"] # Verify run() result = await agent.run("test") assert isinstance(result, AgentResponse) - assert len(result.messages) == 3 + assert len(result.messages) == 2 texts = [message.text for message in result.messages] - assert texts == ["first message", "second message", "third fourth"] + assert texts == ["second message", "third fourth"] + + async def test_workflow_as_agent_filters_user_role_agent_response_update(self) -> None: + """Test that AgentResponseUpdate with role='user' is dropped from the stream.""" + + @executor + async def user_update_executor(messages: list[Message], ctx: WorkflowContext[Never, AgentResponseUpdate]) -> None: + # Emit a user-role AgentResponseUpdate directly + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="echoed user input")], + role="user", + author_name="test", + response_id="resp-1", + message_id="msg-1", + created_at="2026-01-01T00:00:00Z", + ) + ) + # Emit a valid assistant-role update + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="assistant reply")], + role="assistant", + author_name="test", + response_id="resp-1", + message_id="msg-2", + created_at="2026-01-01T00:00:00Z", + ) + ) + + workflow = WorkflowBuilder(start_executor=user_update_executor).build() + agent = workflow.as_agent("user-update-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + # User-role update should be filtered out + assert len(updates) == 1 + assert updates[0].role == "assistant" + assert updates[0].contents[0].text == "assistant reply" + + async def test_workflow_as_agent_filters_orchestration_complete_streaming(self) -> None: + """Test that OrchestrationComplete events are filtered out in streaming mode. + + Orchestrations emit OrchestrationComplete at termination to provide the full + conversation for direct workflow consumers. The WorkflowAgent should filter + these out so they don't appear as AgentResponseUpdate output. + """ + msg_id_1 = str(uuid.uuid4()) + msg_id_2 = str(uuid.uuid4()) + + @executor + async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None: + # Simulate streaming: emit individual AgentResponseUpdate messages + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="first reply")], + role="assistant", + author_name="agent-a", + response_id="resp-1", + message_id=msg_id_1, + created_at="2026-01-01T00:00:00Z", + ) + ) + await ctx.yield_output( + AgentResponseUpdate( + contents=[Content.from_text(text="second reply")], + role="assistant", + author_name="agent-b", + response_id="resp-2", + message_id=msg_id_2, + created_at="2026-01-01T00:00:01Z", + ) + ) + # Simulate termination: emit OrchestrationComplete with full conversation + await ctx.yield_output( + OrchestrationComplete([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + ) + + workflow = WorkflowBuilder(start_executor=dedup_executor).build() + agent = workflow.as_agent("dedup-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + # Should have exactly 2 assistant updates — OrchestrationComplete is filtered out + assert len(updates) == 2 + assert updates[0].contents[0].text == "first reply" + assert updates[1].contents[0].text == "second reply" + + async def test_workflow_as_agent_filters_orchestration_complete_non_streaming(self) -> None: + """Test that OrchestrationComplete events are filtered out in non-streaming mode.""" + msg_id_1 = str(uuid.uuid4()) + msg_id_2 = str(uuid.uuid4()) + + @executor + async def dedup_executor(messages: list[Message], ctx: WorkflowContext[Never, Any]) -> None: + # Emit individual AgentResponse with messages + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="assistant", text="first reply", message_id=msg_id_1), + ], + response_id="resp-1", + ) + ) + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="assistant", text="second reply", message_id=msg_id_2), + ], + response_id="resp-2", + ) + ) + # Emit OrchestrationComplete at termination + await ctx.yield_output( + OrchestrationComplete([ + Message(role="user", text="user input", message_id="user-msg-1"), + Message(role="assistant", text="first reply", message_id=msg_id_1), + Message(role="assistant", text="second reply", message_id=msg_id_2), + ]) + ) + + workflow = WorkflowBuilder(start_executor=dedup_executor).build() + agent = workflow.as_agent("dedup-agent") + + result = await agent.run("test") + + # Should have exactly 2 assistant messages — OrchestrationComplete is filtered out + assert len(result.messages) == 2 + assert result.messages[0].text == "first reply" + assert result.messages[1].text == "second reply" + + async def test_workflow_as_agent_agent_response_raw_repr_consistency(self) -> None: + """Test that AgentResponse with only user messages does not add orphan raw_representations.""" + + @executor + async def mixed_response_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + # Emit an AgentResponse with only user messages + await ctx.yield_output( + AgentResponse( + messages=[Message(role="user", text="user only")], + response_id="resp-user-only", + ) + ) + # Emit an AgentResponse with mixed messages + await ctx.yield_output( + AgentResponse( + messages=[ + Message(role="user", text="user msg"), + Message(role="assistant", text="assistant msg"), + ], + response_id="resp-mixed", + ) + ) + + workflow = WorkflowBuilder(start_executor=mixed_response_executor).build() + agent = workflow.as_agent("mixed-response-agent") + + result = await agent.run("test") + + # Only the assistant message from the mixed response should appear + assert len(result.messages) == 1 + assert result.messages[0].text == "assistant msg" + + # raw_representation should only contain the mixed response's raw_representation, + # not the user-only response's (which produced no output messages) + assert isinstance(result.raw_representation, list) + assert len(result.raw_representation) == 1 async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" diff --git a/python/packages/orchestrations/agent_framework_orchestrations/__init__.py b/python/packages/orchestrations/agent_framework_orchestrations/__init__.py index d1acb7af53..95b6e7e1e9 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/__init__.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/__init__.py @@ -65,6 +65,9 @@ from ._orchestrator_helpers import clean_conversation_for_handoff, create_completion_message from ._sequential import SequentialBuilder +# Re-export OrchestrationComplete from core for convenience +from agent_framework import OrchestrationComplete + __all__ = [ "MAGENTIC_MANAGER_NAME", "ORCH_MSG_KIND_INSTRUCTION", @@ -101,6 +104,7 @@ "MagenticProgressLedgerItem", "MagenticResetSignal", "OrchestrationState", + "OrchestrationComplete", "SequentialBuilder", "StandardMagenticManager", "TerminationCondition", diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py index f01f3700f7..58786a2589 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py @@ -14,7 +14,7 @@ from agent_framework._types import Message from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse -from agent_framework._workflows._events import WorkflowEvent +from agent_framework._workflows._events import OrchestrationComplete, WorkflowEvent from agent_framework._workflows._executor import Executor, handler from agent_framework._workflows._workflow_context import WorkflowContext from typing_extensions import Never @@ -351,7 +351,7 @@ async def _check_termination(self) -> bool: result = await result return result - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check termination conditions and yield completion if met. Args: @@ -363,7 +363,7 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess terminate = await self._check_termination() if terminate: self._append_messages([self._create_completion_message(self.TERMINATION_CONDITION_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False @@ -490,7 +490,7 @@ def _check_round_limit(self) -> bool: return False - async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check round limit and yield completion if reached. Args: @@ -502,7 +502,7 @@ async def _check_round_limit_and_yield(self, ctx: WorkflowContext[Never, list[Me reach_max_rounds = self._check_round_limit() if reach_max_rounds: self._append_messages([self._create_completion_message(self.MAX_ROUNDS_MET_MESSAGE)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py index a99e221409..2bc3bcca9c 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py @@ -33,6 +33,7 @@ from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage +from agent_framework._workflows._events import OrchestrationComplete from agent_framework._workflows._executor import Executor from agent_framework._workflows._workflow import Workflow from agent_framework._workflows._workflow_builder import WorkflowBuilder @@ -537,7 +538,7 @@ async def _check_agent_terminate_and_yield( agent_orchestration_output.final_message or "The conversation has been terminated by the agent." ) self._append_messages([self._create_completion_message(final_message)]) - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index 5d6e84ef05..3066fa9c39 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -45,7 +45,7 @@ from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest, AgentExecutorResponse from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage -from agent_framework._workflows._events import WorkflowEvent +from agent_framework._workflows._events import OrchestrationComplete, WorkflowEvent from agent_framework._workflows._request_info_mixin import response_handler from agent_framework._workflows._workflow import Workflow from agent_framework._workflows._workflow_builder import WorkflowBuilder @@ -588,7 +588,9 @@ async def handle_response( If the response is empty, it indicates termination of the handoff workflow. """ if not response: - await cast(WorkflowContext[Never, list[Message]], ctx).yield_output(self._full_conversation) + await cast(WorkflowContext[Never, OrchestrationComplete], ctx).yield_output( + OrchestrationComplete(self._full_conversation) + ) return # Broadcast the user response to all other agents @@ -647,7 +649,7 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None: return None - async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Message]]) -> bool: + async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, OrchestrationComplete]) -> bool: """Check termination conditions and yield completion if met. Args: @@ -664,7 +666,7 @@ async def _check_terminate_and_yield(self, ctx: WorkflowContext[Never, list[Mess terminated = await terminated if terminated: - await ctx.yield_output(self._full_conversation) + await ctx.yield_output(OrchestrationComplete(self._full_conversation)) return True return False diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py index 5ef4f7fe8c..e70b62a9d8 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_sequential.py @@ -47,6 +47,7 @@ ) from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage +from agent_framework._workflows._events import OrchestrationComplete from agent_framework._workflows._executor import ( Executor, handler, @@ -84,25 +85,25 @@ class _EndWithConversation(Executor): async def end_with_messages( self, conversation: list[Message], - ctx: WorkflowContext[Any, list[Message]], + ctx: WorkflowContext[Any, OrchestrationComplete], ) -> None: """Handler for ending with a list of Message. This is used when the last participant is a custom executor. """ - await ctx.yield_output(list(conversation)) + await ctx.yield_output(OrchestrationComplete(list(conversation))) @handler async def end_with_agent_executor_response( self, response: AgentExecutorResponse, - ctx: WorkflowContext[Any, list[Message] | None], + ctx: WorkflowContext[Any, OrchestrationComplete], ) -> None: """Handle case where last participant is an agent. The agent is wrapped by AgentExecutor and emits AgentExecutorResponse. """ - await ctx.yield_output(response.full_conversation) + await ctx.yield_output(OrchestrationComplete(response.full_conversation)) class SequentialBuilder: diff --git a/python/packages/orchestrations/tests/test_group_chat.py b/python/packages/orchestrations/tests/test_group_chat.py index 7550f820c7..b67f5645f9 100644 --- a/python/packages/orchestrations/tests/test_group_chat.py +++ b/python/packages/orchestrations/tests/test_group_chat.py @@ -15,6 +15,7 @@ ChatResponseUpdate, Content, Message, + OrchestrationComplete, WorkflowEvent, WorkflowRunState, ) @@ -234,8 +235,8 @@ async def test_group_chat_builder_basic_flow() -> None: async for event in workflow.run("coordinate task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 assert len(outputs[0]) >= 1 @@ -279,8 +280,8 @@ async def test_agent_manager_handles_concatenated_json_output() -> None: async for event in workflow.run("coordinate task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs conversation = outputs[-1] @@ -396,8 +397,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have terminated due to max_rounds, expect at least one output assert len(outputs) >= 1 @@ -429,8 +430,8 @@ def termination_condition(conversation: list[Message]) -> bool: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs, "Expected termination to yield output" conversation = outputs[-1] @@ -455,8 +456,8 @@ async def test_termination_condition_agent_manager_finalizes(self) -> None: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert outputs, "Expected termination to yield output" conversation = outputs[-1] @@ -501,8 +502,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test task", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 # Should complete normally @@ -542,8 +543,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test string", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -565,8 +566,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run(task_message, stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -591,8 +592,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run(conversation, stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) assert len(outputs) == 1 @@ -621,8 +622,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have at least one output (the round limit message) assert len(outputs) >= 1 @@ -654,8 +655,8 @@ def selector(state: GroupChatState) -> str: async for event in workflow.run("test", stream=True): if event.type == "output": data = event.data - if isinstance(data, list): - outputs.append(cast(list[Message], data)) + if isinstance(data, OrchestrationComplete): + outputs.append(data.messages) # Should have at least one output (the round limit message) assert len(outputs) >= 1 @@ -679,7 +680,7 @@ async def test_group_chat_checkpoint_runtime_only() -> None: baseline_output: list[Message] | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + baseline_output = ev.data.messages if isinstance(ev.data, OrchestrationComplete) else None # type: ignore if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -715,7 +716,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: baseline_output: list[Message] | None = None async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True): if ev.type == "output": - baseline_output = cast(list[Message], ev.data) if isinstance(ev.data, list) else None # type: ignore + baseline_output = ev.data.messages if isinstance(ev.data, OrchestrationComplete) else None # type: ignore if ev.type == "status" and ev.state in ( WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, @@ -964,10 +965,10 @@ def agent_factory() -> Agent: assert len(outputs) == 1 # The DynamicManagerAgent terminates after second call with final_message final_messages = outputs[0].data - assert isinstance(final_messages, list) + assert isinstance(final_messages, OrchestrationComplete) assert any( msg.text == "dynamic manager final" - for msg in cast(list[Message], final_messages) + for msg in final_messages.messages if msg.author_name == "dynamic_manager" ) diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index 43c2f9153a..3f6be4e572 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -8,11 +8,13 @@ import pytest from agent_framework import ( Agent, + AgentResponseUpdate, BaseContextProvider, ChatResponse, ChatResponseUpdate, Content, Message, + OrchestrationComplete, ResponseStream, WorkflowEvent, resolve_agent_id, @@ -827,8 +829,8 @@ async def test_autonomous_mode_yields_output_without_user_request(): assert outputs, "Autonomous mode should yield a workflow output" final_conversation = outputs[-1].data - assert isinstance(final_conversation, list) - conversation_list = cast(list[Message], final_conversation) + assert isinstance(final_conversation, OrchestrationComplete) + conversation_list = final_conversation.messages assert any(msg.role == "assistant" and (msg.text or "").startswith("specialist reply") for msg in conversation_list) @@ -897,8 +899,8 @@ async def async_termination(conv: list[Message]) -> bool: assert len(outputs) == 1 final_conversation = outputs[0].data - assert isinstance(final_conversation, list) - final_conv_list = cast(list[Message], final_conversation) + assert isinstance(final_conversation, OrchestrationComplete) + final_conv_list = final_conversation.messages user_messages = [msg for msg in final_conv_list if msg.role == "user"] assert len(user_messages) == 2 assert termination_call_count > 0 @@ -955,7 +957,7 @@ async def _get() -> ChatResponse: outputs = [event for event in events if event.type == "output"] assert outputs - conversation_outputs = [event for event in outputs if isinstance(event.data, list)] + conversation_outputs = [event for event in outputs if isinstance(event.data, OrchestrationComplete)] assert len(conversation_outputs) == 1 @@ -1117,3 +1119,47 @@ def get_session(self, *, service_session_id, **kwargs): with pytest.raises(TypeError, match="Participants must be Agent instances"): HandoffBuilder().participants([fake]) + + +async def test_handoff_as_agent_run_stream_does_not_echo_user_input() -> None: + """WorkflowAgent wrapping a handoff workflow must not echo user input in streamed updates. + + When HandoffAgentExecutor emits the full conversation via ctx.yield_output() on + termination, user-role messages from that list should not appear as + AgentResponseUpdate items in the stream returned by WorkflowAgent.run(..., stream=True). + """ + agent = MockHandoffAgent(name="single_agent") + + workflow = ( + HandoffBuilder( + participants=[agent], + # Terminate immediately after the agent responds (user msg + assistant msg = 2). + termination_condition=lambda conv: len(conv) >= 2, + ) + .with_start_agent(agent) + .build() + ) + + workflow_agent = workflow.as_agent(name="test_workflow_agent") + + user_input = "Hi! Can you help me with something?" + updates: list[AgentResponseUpdate] = [] + async for update in workflow_agent.run(user_input, stream=True): + updates.append(update) + + assert updates, "Expected at least one streaming update" + + # The core assertion: no update should carry the user role. + user_role_updates = [u for u in updates if u.role == "user"] + assert not user_role_updates, ( + f"User input was echoed back in the stream as {len(user_role_updates)} update(s). " + "Expected only assistant-role updates." + ) + + # Also verify non-streaming path filters user messages + result = await workflow_agent.run(user_input) + user_role_messages = [m for m in result.messages if m.role == "user"] + assert not user_role_messages, ( + f"User input was echoed back in non-streaming result as {len(user_role_messages)} message(s). " + "Expected only assistant-role messages." + ) diff --git a/python/packages/orchestrations/tests/test_sequential.py b/python/packages/orchestrations/tests/test_sequential.py index 67bcc1bb9e..33bdf091d7 100644 --- a/python/packages/orchestrations/tests/test_sequential.py +++ b/python/packages/orchestrations/tests/test_sequential.py @@ -13,6 +13,7 @@ Content, Executor, Message, + OrchestrationComplete, TypeCompatibilityError, WorkflowContext, WorkflowRunState, @@ -84,7 +85,7 @@ async def test_sequential_agents_append_to_context() -> None: wf = SequentialBuilder(participants=[a1, a2]).build() completed = False - output: list[Message] | None = None + output: OrchestrationComplete | None = None async for ev in wf.run("hello sequential", stream=True): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True @@ -95,8 +96,8 @@ async def test_sequential_agents_append_to_context() -> None: assert completed assert output is not None - assert isinstance(output, list) - msgs: list[Message] = output + assert isinstance(output, OrchestrationComplete) + msgs: list[Message] = output.messages assert len(msgs) == 3 assert msgs[0].role == "user" and "hello sequential" in msgs[0].text assert msgs[1].role == "assistant" and (msgs[1].author_name == "A1" or True) @@ -112,7 +113,7 @@ async def test_sequential_with_custom_executor_summary() -> None: wf = SequentialBuilder(participants=[a1, summarizer]).build() completed = False - output: list[Message] | None = None + output: OrchestrationComplete | None = None async for ev in wf.run("topic X", stream=True): if ev.type == "status" and ev.state == WorkflowRunState.IDLE: completed = True @@ -123,7 +124,7 @@ async def test_sequential_with_custom_executor_summary() -> None: assert completed assert output is not None - msgs: list[Message] = output + msgs: list[Message] = output.messages # Expect: [user, A1 reply, summary] assert len(msgs) == 3 assert msgs[0].role == "user" @@ -137,7 +138,7 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: initial_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(initial_agents), checkpoint_storage=storage).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("checkpoint sequential", stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -154,7 +155,7 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents), checkpoint_storage=storage).build() - resumed_output: list[Message] | None = None + resumed_output: OrchestrationComplete | None = None async for ev in wf_resume.run(checkpoint_id=resume_checkpoint.checkpoint_id, stream=True): if ev.type == "output": resumed_output = ev.data # type: ignore[assignment] @@ -165,8 +166,8 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] + assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages] async def test_sequential_checkpoint_runtime_only() -> None: @@ -176,7 +177,7 @@ async def test_sequential_checkpoint_runtime_only() -> None: agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(agents)).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("runtime checkpoint test", checkpoint_storage=storage, stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment] @@ -193,7 +194,7 @@ async def test_sequential_checkpoint_runtime_only() -> None: resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf_resume = SequentialBuilder(participants=list(resumed_agents)).build() - resumed_output: list[Message] | None = None + resumed_output: OrchestrationComplete | None = None async for ev in wf_resume.run( checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage, stream=True ): @@ -206,8 +207,8 @@ async def test_sequential_checkpoint_runtime_only() -> None: break assert resumed_output is not None - assert [m.role for m in resumed_output] == [m.role for m in baseline_output] - assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + assert [m.role for m in resumed_output.messages] == [m.role for m in baseline_output.messages] + assert [m.text for m in resumed_output.messages] == [m.text for m in baseline_output.messages] async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: @@ -223,7 +224,7 @@ async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) wf = SequentialBuilder(participants=list(agents), checkpoint_storage=buildtime_storage).build() - baseline_output: list[Message] | None = None + baseline_output: OrchestrationComplete | None = None async for ev in wf.run("override test", checkpoint_storage=runtime_storage, stream=True): if ev.type == "output": baseline_output = ev.data # type: ignore[assignment]