-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Python: fix: filter non-assistant messages from AgentResponse in WorkflowAgent #4275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
80d9374
43ba31f
dc51ea8
575f6dc
d6ba0f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1296,3 +1296,221 @@ def test_merge_updates_function_result_no_matching_call(self): | |||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Order: text (user), text (assistant), function_result (orphan at end) | ||||||||||||||||||||||||
| assert content_types == ["text", "text", "function_result"] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| class TestWorkflowAgentUserInputFiltering: | ||||||||||||||||||||||||
| """Test cases for filtering user input from AgentResponse in WorkflowAgent. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Validates the fix for GitHub issue #4261: when a workflow (e.g., GroupChat) | ||||||||||||||||||||||||
| emits an AgentResponse containing the full conversation history (user + | ||||||||||||||||||||||||
| assistant messages), WorkflowAgent should only surface assistant messages | ||||||||||||||||||||||||
| to avoid re-emitting user input. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_streaming_agent_response_filters_user_messages(self): | ||||||||||||||||||||||||
| """Test that streaming filters out user messages from AgentResponse data. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| When an executor yields an AgentResponse containing both user and assistant | ||||||||||||||||||||||||
| messages (as GroupChat orchestrators do), only assistant messages should | ||||||||||||||||||||||||
| appear in the streaming output. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def groupchat_like_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| # Simulate a GroupChat-like executor that emits full conversation history | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="user", text="hi"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Hello! How can I help?", author_name="Principal"), | ||||||||||||||||||||||||
| Message(role="user", text="what is 2+2?"), | ||||||||||||||||||||||||
| Message(role="assistant", text="2+2 = 4", author_name="Maths Teacher"), | ||||||||||||||||||||||||
| Message(role="assistant", text="The answer is 4.", author_name="Principal"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("groupchat-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Collect streaming updates | ||||||||||||||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||||||||||||||
| async for update in agent.run("hi", stream=True): | ||||||||||||||||||||||||
| updates.append(update) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Should only have assistant messages (3 out of 5) | ||||||||||||||||||||||||
| assert len(updates) == 3, f"Expected 3 assistant updates, got {len(updates)}" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Verify all updates are assistant role | ||||||||||||||||||||||||
| for update in updates: | ||||||||||||||||||||||||
| assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Verify author_name is propagated correctly | ||||||||||||||||||||||||
| assert updates[0].author_name == "Principal" | ||||||||||||||||||||||||
| assert updates[1].author_name == "Maths Teacher" | ||||||||||||||||||||||||
| assert updates[2].author_name == "Principal" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Verify the content is correct | ||||||||||||||||||||||||
| texts = [u.text for u in updates] | ||||||||||||||||||||||||
|
Comment on lines
+1348
to
+1355
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider also asserting
Suggested change
|
||||||||||||||||||||||||
| assert "Hello! How can I help?" in texts | ||||||||||||||||||||||||
| assert "2+2 = 4" in texts | ||||||||||||||||||||||||
| assert "The answer is 4." in texts | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Verify user messages are NOT present | ||||||||||||||||||||||||
| assert "hi" not in texts | ||||||||||||||||||||||||
| assert "what is 2+2?" not in texts | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_non_streaming_agent_response_filters_user_messages(self): | ||||||||||||||||||||||||
| """Test that non-streaming also filters out user messages from AgentResponse data.""" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def groupchat_like_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="user", text="hi"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Hello!", author_name="Bot"), | ||||||||||||||||||||||||
| Message(role="user", text="bye"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Goodbye!", author_name="Bot"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("groupchat-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| result = await agent.run("hi") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Should only have assistant messages (2 out of 4) | ||||||||||||||||||||||||
| assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| texts = [msg.text for msg in result.messages] | ||||||||||||||||||||||||
| assert texts == ["Hello!", "Goodbye!"] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_streaming_agent_response_all_assistant_unchanged(self): | ||||||||||||||||||||||||
| """Test that AgentResponse with only assistant messages works correctly (no regression).""" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def assistant_only_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="assistant", text="First response"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Second response"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=assistant_only_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("assistant-only-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||||||||||||||
| async for update in agent.run("test", stream=True): | ||||||||||||||||||||||||
| updates.append(update) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| assert len(updates) == 2 | ||||||||||||||||||||||||
| texts = [u.text for u in updates] | ||||||||||||||||||||||||
| assert texts == ["First response", "Second response"] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_streaming_agent_response_empty_after_filtering(self): | ||||||||||||||||||||||||
| """Test that AgentResponse with only user messages produces no updates.""" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def user_only_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| # Edge case: all messages are user role | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="user", text="user msg 1"), | ||||||||||||||||||||||||
| Message(role="user", text="user msg 2"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=user_only_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("user-only-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||||||||||||||
| async for update in agent.run("test", stream=True): | ||||||||||||||||||||||||
| updates.append(update) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # No assistant messages means no updates | ||||||||||||||||||||||||
| assert len(updates) == 0 | ||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a test for non-assistant, non-user roles (e.g., 'system' or 'tool') to explicitly document that the filter drops those as well. This would prevent a future regression if someone expects system messages to pass through.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copy all feedback - I'll address tonight. |
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_streaming_agent_response_filters_system_and_tool_roles(self): | ||||||||||||||||||||||||
| """Test that system and tool role messages are also filtered out. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| The allowlist filter (role == "assistant") intentionally drops all non-assistant | ||||||||||||||||||||||||
| roles, including "system" and "tool". This test documents that behavior explicitly | ||||||||||||||||||||||||
| to prevent future regressions if someone expects those messages to pass through. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def mixed_role_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="system", text="You are a helpful assistant."), | ||||||||||||||||||||||||
| Message(role="user", text="hi"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Hello!", author_name="Bot"), | ||||||||||||||||||||||||
| Message(role="tool", text="tool result data"), | ||||||||||||||||||||||||
| Message(role="assistant", text="Based on the tool result...", author_name="Bot"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=mixed_role_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("mixed-role-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Streaming path | ||||||||||||||||||||||||
| updates: list[AgentResponseUpdate] = [] | ||||||||||||||||||||||||
| async for update in agent.run("hi", stream=True): | ||||||||||||||||||||||||
| updates.append(update) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Only assistant messages should survive (2 out of 5) | ||||||||||||||||||||||||
| assert len(updates) == 2, f"Expected 2 assistant updates, got {len(updates)}" | ||||||||||||||||||||||||
| for update in updates: | ||||||||||||||||||||||||
| assert update.role == "assistant" | ||||||||||||||||||||||||
| texts = [u.text for u in updates] | ||||||||||||||||||||||||
| assert texts == ["Hello!", "Based on the tool result..."] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # Non-streaming path | ||||||||||||||||||||||||
| workflow2 = WorkflowBuilder(start_executor=mixed_role_executor).build() | ||||||||||||||||||||||||
| agent2 = workflow2.as_agent("mixed-role-agent-2") | ||||||||||||||||||||||||
| result = await agent2.run("hi") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}" | ||||||||||||||||||||||||
| result_texts = [msg.text for msg in result.messages] | ||||||||||||||||||||||||
| assert result_texts == ["Hello!", "Based on the tool result..."] | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| async def test_non_streaming_agent_response_empty_after_filtering(self): | ||||||||||||||||||||||||
| """Test that non-streaming AgentResponse with only non-assistant messages produces empty result. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| Counterpart to test_streaming_agent_response_empty_after_filtering for the | ||||||||||||||||||||||||
| non-streaming code path. | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @executor | ||||||||||||||||||||||||
| async def user_only_executor( | ||||||||||||||||||||||||
| messages: list[Message], ctx: WorkflowContext[Never, AgentResponse] | ||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||
| response = AgentResponse( | ||||||||||||||||||||||||
| messages=[ | ||||||||||||||||||||||||
| Message(role="user", text="user msg 1"), | ||||||||||||||||||||||||
| Message(role="user", text="user msg 2"), | ||||||||||||||||||||||||
| ], | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
| await ctx.yield_output(response) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| workflow = WorkflowBuilder(start_executor=user_only_executor).build() | ||||||||||||||||||||||||
| agent = workflow.as_agent("user-only-agent") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| result = await agent.run("test") | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| # No assistant messages means empty messages list | ||||||||||||||||||||||||
| assert len(result.messages) == 0, f"Expected 0 messages, got {len(result.messages)}" | ||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests validate filtering for the AgentResponse output branch, but they don’t cover the GroupChat-style
list[Message]output path (which is the likely source of #4261). If the goal is to prevent re-emitting user inputs for GroupChat workflows, add a regression test that runs a minimal GroupChat (or an executor that yieldslist[Message]conversation history) throughworkflow.as_agent()and asserts user-role messages are not surfaced.