diff --git a/python/packages/core/agent_framework/_agents.py b/python/packages/core/agent_framework/_agents.py index a519796b17..8f477f9223 100644 --- a/python/packages/core/agent_framework/_agents.py +++ b/python/packages/core/agent_framework/_agents.py @@ -947,7 +947,11 @@ def _propagate_conversation_id(update: AgentResponseUpdate) -> AgentResponseUpda def _finalizer(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: ctx = ctx_holder["ctx"] - rf = ctx.get("chat_options", {}).get("response_format") if ctx else (options.get("response_format") if options else None) + rf = ( + ctx.get("chat_options", {}).get("response_format") + if ctx + else (options.get("response_format") if options else None) + ) return self._finalize_response_updates(updates, response_format=rf) return ( diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index bf615814b3..5db2ecee6c 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -213,6 +213,31 @@ def run( ) return self._run_impl(messages, response_id, session, checkpoint_id, checkpoint_storage, **kwargs) + def _filter_messages(self, chat_messages: list[Message]) -> list[Message]: + """Return only the last meaningful non-user message from a list of messages. + + Args: + chat_messages: The conversation history or workflow output to filter. + + Returns: + A single-element list containing the last meaningful non-user message, + or an empty list if none exists. + """ + if not chat_messages: + return [] + + for msg in reversed(chat_messages): + if msg.role != "user" and msg.text and msg.text.strip(): + return [msg] + # fallback: last non-user message + non_user = [m for m in reversed(chat_messages) if m.role != "user"][:1] + if not non_user: + logger.warning( + "_filter_messages: no non-user messages found in list[Message] output. " + "Returning empty list — this likely indicates an unexpected workflow termination state." + ) + return non_user + async def _run_impl( self, messages: AgentRunInputs, @@ -489,8 +514,10 @@ def _convert_workflow_events_to_agent_response( messages.append(data) raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) messages.extend(chat_messages) + # raw_representations intentionally stores the original unfiltered list — + # it records what the workflow emitted, not what was surfaced to the caller. raw_representations.append(data) else: contents = self._extract_contents(data) @@ -606,7 +633,7 @@ def _convert_workflow_event_to_agent_response_updates( ] if is_instance_of(data, list[Message]): # Convert each Message to an AgentResponseUpdate - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) updates = [] for msg in chat_messages: updates.append( diff --git a/python/packages/core/tests/workflow/test_agent_executor.py b/python/packages/core/tests/workflow/test_agent_executor.py index db53868ee1..4a850db642 100644 --- a/python/packages/core/tests/workflow/test_agent_executor.py +++ b/python/packages/core/tests/workflow/test_agent_executor.py @@ -286,9 +286,7 @@ async def test_agent_executor_run_streaming_with_stream_kwarg_does_not_raise() - @pytest.mark.parametrize("reserved_kwarg", ["session", "stream", "messages"]) -async def test_prepare_agent_run_args_strips_reserved_kwargs( - reserved_kwarg: str, caplog: "LogCaptureFixture" -) -> None: +async def test_prepare_agent_run_args_strips_reserved_kwargs(reserved_kwarg: str, caplog: "LogCaptureFixture") -> None: """_prepare_agent_run_args must remove reserved kwargs and log a warning.""" raw = {reserved_kwarg: "should-be-stripped", "custom_key": "keep-me"} diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index d20d60ba3b..7fd59e7166 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -469,15 +469,20 @@ async def raw_yielding_executor( assert updates[2].raw_representation.value == 42 async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None: - """Test that yield_output with list[Message] extracts contents from all messages. + """Test that yield_output with list[Message] surfaces only the last assistant message. - Note: Content items are coalesced by _finalize_response, so multiple text contents - become a single merged Content in the final response. + When a workflow executor yields a list[Message] (as GroupChat orchestrators + do with self._full_conversation on termination), _filter_messages returns + only the last meaningful assistant message to avoid re-emitting user input + and replaying the full conversation history across turns. See #4261. + + Users who need intermediate agent responses can opt in via + intermediate_outputs=True in GroupChatBuilder. """ @executor async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[Never, list[Message]]) -> None: - # Yield a list of Messages (as SequentialBuilder does) + # Yield a list of Messages (as GroupChat orchestrator does with _full_conversation) msg_list = [ Message(role="user", text="first message"), Message(role="assistant", text="second message"), @@ -491,25 +496,24 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N workflow = WorkflowBuilder(start_executor=list_yielding_executor).build() agent = workflow.as_agent("list-msg-agent") - # Verify streaming returns the update with all 4 contents before coalescing + # Streaming: _filter_messages returns only the last meaningful assistant message updates: list[AgentResponseUpdate] = [] async for update in agent.run("test", stream=True): updates.append(update) - assert len(updates) == 3 + # Only the last assistant message should be surfaced (user messages filtered, + # earlier assistant messages treated as conversation history replay) + assert len(updates) == 1 full_response = AgentResponse.from_updates(updates) - assert len(full_response.messages) == 3 - texts = [message.text for message in full_response.messages] - # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content - assert texts == ["first message", "second message", "thirdfourth"] + assert len(full_response.messages) == 1 + assert full_response.messages[0].text == "thirdfourth" - # Verify run() + # Non-streaming: same filtering applies result = await agent.run("test") assert isinstance(result, AgentResponse) - assert len(result.messages) == 3 - texts = [message.text for message in result.messages] - assert texts == ["first message", "second message", "third fourth"] + assert len(result.messages) == 1 + assert result.messages[0].text == "third fourth" async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" @@ -1382,3 +1386,213 @@ def test_merge_updates_function_result_no_matching_call(self): # Order: text (user), text (assistant), function_result (orphan at end) assert content_types == ["text", "text", "function_result"] + + +class TestWorkflowAgentUserInputFilteringRegression: + """Regression tests for #4261: user input must not compound across successive turns. + + When a GroupChat orchestrator terminates, it yields self._full_conversation via + ctx.yield_output(self._full_conversation). This is a list[Message] containing the + entire conversation history — both user inputs and all prior assistant responses. + + Without filtering, WorkflowAgent's _convert_workflow_event_to_agent_response_updates + (streaming) and _convert_workflow_events_to_agent_response (non-streaming) forward + all messages verbatim, causing user inputs and earlier assistant responses to + accumulate in the output on every successive turn. + + _filter_messages fixes this by returning only the last meaningful assistant message + from list[Message] output, aligning with GroupChatBuilder's default behavior of + intermediate_outputs=False where only the orchestrator's final summary is surfaced. + Users who need intermediate agent responses can opt in via intermediate_outputs=True. + + These tests use a class-based Executor (rather than the @executor decorator) to + ensure generic type annotations on WorkflowContext[Never, list[Message]] resolve + correctly at runtime, so is_instance_of(data, list[Message]) hits the right branch. + """ + + async def test_streaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's streamed response.""" + + class GroupChatLikeExecutor(Executor): + """Simulates a GroupChat orchestrator's termination behavior. + + On termination, BaseGroupChatOrchestrator yields self._full_conversation + via ctx.yield_output(self._full_conversation), which is a list[Message] + containing the entire conversation history (both user and assistant messages). + This executor replicates that exact pattern to exercise the list[Message] + branch in _filter_messages and verify that user inputs do not compound + across successive turns (see #4261). + """ + + @handler + async def handle_messages( + self, + messages: list[Message], + ctx: WorkflowContext[Never, list[Message]], + ) -> None: + input_text = messages[-1].text or "" + full_conversation: list[Message] = [ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ] + await ctx.yield_output(full_conversation) + + groupchat_executor = GroupChatLikeExecutor(id="groupchat") + workflow = WorkflowBuilder(start_executor=groupchat_executor).build() + agent = workflow.as_agent("groupchat-agent") + session = AgentSession() + + # Turn 1 + updates1: list[AgentResponseUpdate] = [] + async for chunk in agent.run("first_question", stream=True, session=session): + updates1.append(chunk) + + # Turn 2: "first_question" must NOT bleed into turn 2's streamed output + updates2: list[AgentResponseUpdate] = [] + async for chunk in agent.run("second_question", stream=True, session=session): + updates2.append(chunk) + + text2 = " ".join(u.text or "" for u in updates2 if u.text) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 streaming output (compounding regression)" + ) + assert "Answer to: second_question" in text2 + + async def test_nonstreaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's response.""" + + class GroupChatLikeExecutor(Executor): + """Simulates a GroupChat orchestrator's termination behavior. + + On termination, BaseGroupChatOrchestrator yields self._full_conversation + via ctx.yield_output(self._full_conversation), which is a list[Message] + containing the entire conversation history (both user and assistant messages). + This executor replicates that exact pattern to exercise the list[Message] + branch in _filter_messages and verify that user inputs do not compound + across successive turns (see #4261). + """ + + @handler + async def handle_messages( + self, + messages: list[Message], + ctx: WorkflowContext[Never, list[Message]], + ) -> None: + input_text = messages[-1].text or "" + full_conversation: list[Message] = [ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ] + await ctx.yield_output(full_conversation) + + groupchat_executor = GroupChatLikeExecutor(id="groupchat") + workflow = WorkflowBuilder(start_executor=groupchat_executor).build() + agent = workflow.as_agent("groupchat-agent") + session = AgentSession() + + # Turn 1 + await agent.run("first_question", session=session) + + # Turn 2: "first_question" must NOT bleed into turn 2's response + result2 = await agent.run("second_question", session=session) + text2 = " ".join(m.text or "" for m in result2.messages) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 response (compounding regression)" + ) + assert "Answer to: second_question" in text2 + + +class TestFilterMessages: + """Direct unit tests for WorkflowAgent._filter_messages edge cases. + + Covers empty input, all-user messages, assistant messages with no/whitespace text, + mixed roles, and ordering. The all-user and empty cases both hit the + `if not non_user: return chat_messages` fallback path, returning the original + list unchanged rather than silently dropping output (see moonbox3's review on #4268). + """ + + def _make_agent(self) -> WorkflowAgent: + @executor + async def _e(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output("x") + + workflow = WorkflowBuilder(start_executor=_e).build() + return WorkflowAgent(workflow=workflow) + + def test_empty_list_returns_empty(self): + agent = self._make_agent() + assert agent._filter_messages([]) == [] + + def test_single_assistant_message_empty_text(self): + """Return the single assistant message as-is when it's the only message, even if it has no text""" + agent = self._make_agent() + msg = Message(role="assistant", text="") + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_whitespace_text(self): + """Return the single assistant message as-is when it's the only message, even if it has only whitespace.""" + agent = self._make_agent() + msg = Message(role="assistant", text=" ") + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_none_text(self): + """Return the single assistant message as-is when it's the only message, even if it has None text.""" + agent = self._make_agent() + msg = Message(role="assistant", text=None) + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_returned(self): + """Return the single assistant message as-is when it's the only message""" + agent = self._make_agent() + msg = Message(role="assistant", text="Hello") + assert agent._filter_messages([msg]) == [msg] + + def test_all_user_messages_returns_empty_list(self): + """All-user input: no assistant content exists to surface, returns empty list.""" + + agent = self._make_agent() + msgs = [Message(role="user", text="hi"), Message(role="user", text="hello")] + result = agent._filter_messages(msgs) + assert result == [] + + def test_mixed_roles_returns_last_assistant(self): + agent = self._make_agent() + msgs = [ + Message(role="user", text="q1"), + Message(role="assistant", text="a1"), + Message(role="user", text="q2"), + Message(role="assistant", text="a2"), # should be returned + ] + result = agent._filter_messages(msgs) + assert len(result) == 1 + assert result[0].text == "a2" + + def test_assistant_with_none_text_falls_through_to_next(self): + agent = self._make_agent() + msgs = [ + Message(role="assistant", text="a1"), + Message(role="assistant", text=None), + Message(role="assistant", text=" "), + ] + # The last non-user message is whitespace-only, falls to non-text fallback + result = agent._filter_messages(msgs) + assert len(result) == 1 # fallback picks last non-user message + + def test_returns_last_not_first_assistant(self): + agent = self._make_agent() + msgs = [ + Message(role="assistant", text="First response"), + Message(role="user", text="follow up"), + Message(role="assistant", text="Second response"), + ] + result = agent._filter_messages(msgs) + assert len(result) == 1 + assert result[0].text == "Second response" diff --git a/python/packages/core/tests/workflow/test_workflow_kwargs.py b/python/packages/core/tests/workflow/test_workflow_kwargs.py index 379435e124..ce1465effc 100644 --- a/python/packages/core/tests/workflow/test_workflow_kwargs.py +++ b/python/packages/core/tests/workflow/test_workflow_kwargs.py @@ -499,9 +499,7 @@ async def _done() -> AgentResponse: # Continue with responses only — no new kwargs approval = request_events[0] - await workflow.run( - responses={approval.request_id: approval.data.to_function_approval_response(True)} - ) + await workflow.run(responses={approval.request_id: approval.data.to_function_approval_response(True)}) # Both calls should have received the original kwargs assert len(agent.captured_kwargs) == 2 diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index 5d6e84ef05..557595c94f 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -252,7 +252,6 @@ def _prepare_agent_with_handoffs( Returns: A cloned ``Agent`` instance with handoff tools added """ - # Clone the agent to avoid mutating the original cloned_agent = self._clone_chat_agent(agent) # type: ignore # Add handoff tools to the cloned agent