From 80d93740107e044e2a8afc1075150f8439ed39e9 Mon Sep 17 00:00:00 2001 From: "L. Elaine Dazzio" Date: Wed, 25 Feb 2026 16:28:35 -0500 Subject: [PATCH 1/5] fix: filter non-assistant messages from AgentResponse in WorkflowAgent When a workflow (e.g., GroupChat) emits an AgentResponse containing the full conversation history (user + assistant messages), WorkflowAgent was converting ALL messages to output, causing user inputs to be re-emitted as assistant responses. This was visible as accumulated user inputs in each successive response. Fix: filter AgentResponse.messages to only include role="assistant" messages in both _convert_workflow_event_to_agent_response_updates (streaming) and _convert_workflow_events_to_agent_response (non-streaming). Fixes #4261 --- .../core/agent_framework/_workflows/_agent.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 3fb83803c4..15718a1e26 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -462,7 +462,11 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - messages.extend(data.messages) + # Filter to only assistant messages to avoid re-emitting user input + # that may be included in the AgentResponse's conversation history + # (e.g., from GroupChat orchestrators that include the full conversation). + assistant_messages = [msg for msg in data.messages if msg.role == "assistant"] + messages.extend(assistant_messages) raw_representations.append(data.raw_representation) merged_usage = add_usage_details(merged_usage, data.usage_details) latest_created_at = ( @@ -563,9 +567,14 @@ def _convert_workflow_event_to_agent_response_updates( data.author_name = executor_id return [data] if isinstance(data, AgentResponse): - # Convert each message in AgentResponse to an AgentResponseUpdate + # Convert each assistant message in AgentResponse to an AgentResponseUpdate. + # Filter out non-assistant messages to avoid re-emitting user input + # that may be included in the AgentResponse's conversation history + # (e.g., from GroupChat orchestrators that include the full conversation). updates: list[AgentResponseUpdate] = [] for msg in data.messages: + if msg.role != "assistant": + continue updates.append( AgentResponseUpdate( contents=list(msg.contents), From 43ba31f44420c0634e6633799e7001232739662c Mon Sep 17 00:00:00 2001 From: "L. Elaine Dazzio" Date: Wed, 25 Feb 2026 16:32:29 -0500 Subject: [PATCH 2/5] test: add tests for user input filtering in WorkflowAgent (#4261) Add TestWorkflowAgentUserInputFiltering class with tests verifying that: - Streaming filters user messages from AgentResponse data - Non-streaming filters user messages from AgentResponse data - All-assistant AgentResponse works unchanged (no regression) - All-user AgentResponse produces no updates --- .../tests/workflow/test_workflow_agent.py | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index b2fbded39b..fe0f4f4ffd 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -1296,3 +1296,142 @@ def test_merge_updates_function_result_no_matching_call(self): # Order: text (user), text (assistant), function_result (orphan at end) assert content_types == ["text", "text", "function_result"] + + +class TestWorkflowAgentUserInputFiltering: + """Test cases for filtering user input from AgentResponse in WorkflowAgent. + + Validates the fix for GitHub issue #4261: when a workflow (e.g., GroupChat) + emits an AgentResponse containing the full conversation history (user + + assistant messages), WorkflowAgent should only surface assistant messages + to avoid re-emitting user input. + """ + + async def test_streaming_agent_response_filters_user_messages(self): + """Test that streaming filters out user messages from AgentResponse data. + + When an executor yields an AgentResponse containing both user and assistant + messages (as GroupChat orchestrators do), only assistant messages should + appear in the streaming output. + """ + + @executor + async def groupchat_like_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + # Simulate a GroupChat-like executor that emits full conversation history + response = AgentResponse( + messages=[ + Message(role="user", text="hi"), + Message(role="assistant", text="Hello! How can I help?", author_name="Principal"), + Message(role="user", text="what is 2+2?"), + Message(role="assistant", text="2+2 = 4", author_name="Maths Teacher"), + Message(role="assistant", text="The answer is 4.", author_name="Principal"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() + agent = workflow.as_agent("groupchat-agent") + + # Collect streaming updates + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("hi", stream=True): + updates.append(update) + + # Should only have assistant messages (3 out of 5) + assert len(updates) == 3, f"Expected 3 assistant updates, got {len(updates)}" + + # Verify all updates are assistant role + for update in updates: + assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'" + + # Verify the content is correct + texts = [u.text for u in updates] + assert "Hello! How can I help?" in texts + assert "2+2 = 4" in texts + assert "The answer is 4." in texts + + # Verify user messages are NOT present + assert "hi" not in texts + assert "what is 2+2?" not in texts + + async def test_non_streaming_agent_response_filters_user_messages(self): + """Test that non-streaming also filters out user messages from AgentResponse data.""" + + @executor + async def groupchat_like_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="user", text="hi"), + Message(role="assistant", text="Hello!", author_name="Bot"), + Message(role="user", text="bye"), + Message(role="assistant", text="Goodbye!", author_name="Bot"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() + agent = workflow.as_agent("groupchat-agent") + + result = await agent.run("hi") + + # Should only have assistant messages (2 out of 4) + assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}" + + texts = [msg.text for msg in result.messages] + assert texts == ["Hello!", "Goodbye!"] + + async def test_streaming_agent_response_all_assistant_unchanged(self): + """Test that AgentResponse with only assistant messages works correctly (no regression).""" + + @executor + async def assistant_only_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="assistant", text="First response"), + Message(role="assistant", text="Second response"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=assistant_only_executor).build() + agent = workflow.as_agent("assistant-only-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + assert len(updates) == 2 + texts = [u.text for u in updates] + assert texts == ["First response", "Second response"] + + async def test_streaming_agent_response_empty_after_filtering(self): + """Test that AgentResponse with only user messages produces no updates.""" + + @executor + async def user_only_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + # Edge case: all messages are user role + response = AgentResponse( + messages=[ + Message(role="user", text="user msg 1"), + Message(role="user", text="user msg 2"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=user_only_executor).build() + agent = workflow.as_agent("user-only-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("test", stream=True): + updates.append(update) + + # No assistant messages means no updates + assert len(updates) == 0 From dc51ea87c1f4edbdd56f5801b2a900e8bb636f38 Mon Sep 17 00:00:00 2001 From: "L. Elaine Dazzio" Date: Thu, 26 Feb 2026 17:15:16 -0500 Subject: [PATCH 3/5] test: address moonbox3 review feedback on PR #4275 - Add author_name assertions to streaming filter test - Add test for system/tool role filtering - Add non-streaming edge case for empty-after-filtering --- .../test_workflow_agent_filtering_addl.py | 162 ++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 python/packages/core/tests/workflow/test_workflow_agent_filtering_addl.py diff --git a/python/packages/core/tests/workflow/test_workflow_agent_filtering_addl.py b/python/packages/core/tests/workflow/test_workflow_agent_filtering_addl.py new file mode 100644 index 0000000000..c5e9e0f9a3 --- /dev/null +++ b/python/packages/core/tests/workflow/test_workflow_agent_filtering_addl.py @@ -0,0 +1,162 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Additional test cases for WorkflowAgent user input filtering. + +These tests address review feedback from moonbox3 on PR #4275: +1. Verify author_name is preserved through filtering +2. Verify non-assistant, non-user roles (system, tool) are also filtered +3. Non-streaming edge case for empty-after-filtering +""" + +from typing_extensions import Never + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + Message, + WorkflowBuilder, + WorkflowContext, + executor, +) + + +class TestWorkflowAgentUserInputFilteringAdditional: + """Additional filtering tests addressing PR #4275 review feedback.""" + + async def test_streaming_author_name_preserved_through_filtering(self): + """Test that author_name is correctly propagated through filtering. + + Addresses moonbox3's suggestion to verify that the production code's + `author_name=msg.author_name` mapping is preserved when non-assistant + messages are filtered out. + """ + + @executor + async def groupchat_like_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="user", text="hi"), + Message(role="assistant", text="Hello! How can I help?", author_name="Principal"), + Message(role="user", text="what is 2+2?"), + Message(role="assistant", text="2+2 = 4", author_name="Maths Teacher"), + Message(role="assistant", text="The answer is 4.", author_name="Principal"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() + agent = workflow.as_agent("groupchat-agent") + + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("hi", stream=True): + updates.append(update) + + # Should only have assistant messages (3 out of 5) + assert len(updates) == 3, f"Expected 3 assistant updates, got {len(updates)}" + + # Verify all updates are assistant role + for update in updates: + assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'" + + # Verify author_name is propagated correctly + assert updates[0].author_name == "Principal" + assert updates[1].author_name == "Maths Teacher" + assert updates[2].author_name == "Principal" + + async def test_streaming_filters_system_and_tool_roles(self): + """Test that non-assistant, non-user roles (system, tool) are also filtered. + + The filter uses an allowlist (role == "assistant") rather than a blocklist, + so system and tool messages should also be excluded from the output. + This is intentional: WorkflowAgent output should only contain assistant + messages. System prompts and tool results are internal workflow artifacts. + """ + + @executor + async def mixed_roles_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="system", text="You are a helpful assistant."), + Message(role="user", text="What is the weather?"), + Message(role="assistant", text="Let me check the weather for you."), + Message(role="tool", text="Weather API result: Sunny, 72°F"), + Message(role="assistant", text="It's sunny and 72°F today!"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=mixed_roles_executor).build() + agent = workflow.as_agent("mixed-roles-agent") + + # Test streaming path + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("weather?", stream=True): + updates.append(update) + + # Only assistant messages should pass through (2 out of 5) + assert len(updates) == 2, f"Expected 2 assistant updates, got {len(updates)}" + texts = [u.text for u in updates] + assert texts == ["Let me check the weather for you.", "It's sunny and 72°F today!"] + + # Verify system and tool messages are NOT present + for update in updates: + assert update.role == "assistant" + + async def test_non_streaming_filters_system_and_tool_roles(self): + """Test non-streaming path also filters system and tool roles.""" + + @executor + async def mixed_roles_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="system", text="System prompt"), + Message(role="user", text="Hello"), + Message(role="assistant", text="Hi there!"), + Message(role="tool", text="tool output"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=mixed_roles_executor).build() + agent = workflow.as_agent("mixed-roles-agent") + + result = await agent.run("hello") + + # Only the assistant message should remain + assert len(result.messages) == 1, f"Expected 1 message, got {len(result.messages)}" + assert result.messages[0].text == "Hi there!" + assert result.messages[0].role == "assistant" + + async def test_non_streaming_empty_after_filtering(self): + """Test non-streaming path handles AgentResponse with only non-assistant messages. + + Verifies that when all messages are filtered out in the non-streaming path, + the result contains an empty messages list without crashing. + """ + + @executor + async def non_assistant_only_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="user", text="user msg"), + Message(role="system", text="system msg"), + Message(role="tool", text="tool msg"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=non_assistant_only_executor).build() + agent = workflow.as_agent("non-assistant-only-agent") + + result = await agent.run("test") + + # All messages filtered out — should be empty, not crash + assert len(result.messages) == 0, f"Expected 0 messages, got {len(result.messages)}" From 575f6dccdff101a4686559c7b3f64bd9bb2708c1 Mon Sep 17 00:00:00 2001 From: LEDazzio01 <170764058+LEDazzio01@users.noreply.github.com> Date: Fri, 27 Feb 2026 20:22:12 -0500 Subject: [PATCH 4/5] Address review feedback: add author_name assertions and improve filter comments --- .../core/agent_framework/_workflows/_agent.py | 14 ++++++++------ .../core/tests/workflow/test_workflow_agent.py | 5 +++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 15718a1e26..36d67adf65 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -462,9 +462,10 @@ def _convert_workflow_events_to_agent_response( ) if isinstance(data, AgentResponse): - # Filter to only assistant messages to avoid re-emitting user input - # that may be included in the AgentResponse's conversation history - # (e.g., from GroupChat orchestrators that include the full conversation). + # Filter to only assistant messages — system, tool, and user messages + # are intentionally excluded. System prompts and tool results are + # internal workflow artifacts; user messages would be re-emitted + # (e.g., from GroupChat orchestrators that include full conversation history). assistant_messages = [msg for msg in data.messages if msg.role == "assistant"] messages.extend(assistant_messages) raw_representations.append(data.raw_representation) @@ -568,9 +569,10 @@ def _convert_workflow_event_to_agent_response_updates( return [data] if isinstance(data, AgentResponse): # Convert each assistant message in AgentResponse to an AgentResponseUpdate. - # Filter out non-assistant messages to avoid re-emitting user input - # that may be included in the AgentResponse's conversation history - # (e.g., from GroupChat orchestrators that include the full conversation). + # Filter out non-assistant messages — system, tool, and user messages + # are intentionally excluded. System prompts and tool results are + # internal workflow artifacts; user messages would be re-emitted + # (e.g., from GroupChat orchestrators that include full conversation history). updates: list[AgentResponseUpdate] = [] for msg in data.messages: if msg.role != "assistant": diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index fe0f4f4ffd..a7856a8486 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -1346,6 +1346,11 @@ async def groupchat_like_executor( for update in updates: assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'" + # Verify author_name is propagated correctly + assert updates[0].author_name == "Principal" + assert updates[1].author_name == "Maths Teacher" + assert updates[2].author_name == "Principal" + # Verify the content is correct texts = [u.text for u in updates] assert "Hello! How can I help?" in texts From d6ba0f9e87ebde7909e36025f34f76e4ca644a98 Mon Sep 17 00:00:00 2001 From: LEDazzio01 <170764058+LEDazzio01@users.noreply.github.com> Date: Sat, 28 Feb 2026 18:39:02 -0500 Subject: [PATCH 5/5] test: add system/tool role filtering and non-streaming empty-after-filtering tests --- .../tests/workflow/test_workflow_agent.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index a7856a8486..d4c82f708f 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -1440,3 +1440,77 @@ async def user_only_executor( # No assistant messages means no updates assert len(updates) == 0 + + async def test_streaming_agent_response_filters_system_and_tool_roles(self): + """Test that system and tool role messages are also filtered out. + + The allowlist filter (role == "assistant") intentionally drops all non-assistant + roles, including "system" and "tool". This test documents that behavior explicitly + to prevent future regressions if someone expects those messages to pass through. + """ + + @executor + async def mixed_role_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="system", text="You are a helpful assistant."), + Message(role="user", text="hi"), + Message(role="assistant", text="Hello!", author_name="Bot"), + Message(role="tool", text="tool result data"), + Message(role="assistant", text="Based on the tool result...", author_name="Bot"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=mixed_role_executor).build() + agent = workflow.as_agent("mixed-role-agent") + + # Streaming path + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("hi", stream=True): + updates.append(update) + + # Only assistant messages should survive (2 out of 5) + assert len(updates) == 2, f"Expected 2 assistant updates, got {len(updates)}" + for update in updates: + assert update.role == "assistant" + texts = [u.text for u in updates] + assert texts == ["Hello!", "Based on the tool result..."] + + # Non-streaming path + workflow2 = WorkflowBuilder(start_executor=mixed_role_executor).build() + agent2 = workflow2.as_agent("mixed-role-agent-2") + result = await agent2.run("hi") + + assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}" + result_texts = [msg.text for msg in result.messages] + assert result_texts == ["Hello!", "Based on the tool result..."] + + async def test_non_streaming_agent_response_empty_after_filtering(self): + """Test that non-streaming AgentResponse with only non-assistant messages produces empty result. + + Counterpart to test_streaming_agent_response_empty_after_filtering for the + non-streaming code path. + """ + + @executor + async def user_only_executor( + messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] + ) -> None: + response = AgentResponse( + messages=[ + Message(role="user", text="user msg 1"), + Message(role="user", text="user msg 2"), + ], + ) + await ctx.yield_output(response) + + workflow = WorkflowBuilder(start_executor=user_only_executor).build() + agent = workflow.as_agent("user-only-agent") + + result = await agent.run("test") + + # No assistant messages means empty messages list + assert len(result.messages) == 0, f"Expected 0 messages, got {len(result.messages)}"