Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ def _convert_workflow_events_to_agent_response(
)

if isinstance(data, AgentResponse):
messages.extend(data.messages)
# Filter to only assistant messages — system, tool, and user messages
# are intentionally excluded. System prompts and tool results are
# internal workflow artifacts; user messages would be re-emitted
# (e.g., from GroupChat orchestrators that include full conversation history).
assistant_messages = [msg for msg in data.messages if msg.role == "assistant"]
messages.extend(assistant_messages)
raw_representations.append(data.raw_representation)
merged_usage = add_usage_details(merged_usage, data.usage_details)
latest_created_at = (
Expand Down Expand Up @@ -563,9 +568,15 @@ def _convert_workflow_event_to_agent_response_updates(
data.author_name = executor_id
return [data]
if isinstance(data, AgentResponse):
# Convert each message in AgentResponse to an AgentResponseUpdate
# Convert each assistant message in AgentResponse to an AgentResponseUpdate.
# Filter out non-assistant messages — system, tool, and user messages
# are intentionally excluded. System prompts and tool results are
# internal workflow artifacts; user messages would be re-emitted
# (e.g., from GroupChat orchestrators that include full conversation history).
updates: list[AgentResponseUpdate] = []
for msg in data.messages:
if msg.role != "assistant":
continue
updates.append(
AgentResponseUpdate(
contents=list(msg.contents),
Expand Down
218 changes: 218 additions & 0 deletions python/packages/core/tests/workflow/test_workflow_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,3 +1296,221 @@ def test_merge_updates_function_result_no_matching_call(self):

# Order: text (user), text (assistant), function_result (orphan at end)
assert content_types == ["text", "text", "function_result"]


class TestWorkflowAgentUserInputFiltering:
"""Test cases for filtering user input from AgentResponse in WorkflowAgent.

Validates the fix for GitHub issue #4261: when a workflow (e.g., GroupChat)
emits an AgentResponse containing the full conversation history (user +
assistant messages), WorkflowAgent should only surface assistant messages
to avoid re-emitting user input.
"""

async def test_streaming_agent_response_filters_user_messages(self):
"""Test that streaming filters out user messages from AgentResponse data.

When an executor yields an AgentResponse containing both user and assistant
messages (as GroupChat orchestrators do), only assistant messages should
appear in the streaming output.
"""

@executor
async def groupchat_like_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
# Simulate a GroupChat-like executor that emits full conversation history
response = AgentResponse(
messages=[
Message(role="user", text="hi"),
Message(role="assistant", text="Hello! How can I help?", author_name="Principal"),
Message(role="user", text="what is 2+2?"),
Message(role="assistant", text="2+2 = 4", author_name="Maths Teacher"),
Message(role="assistant", text="The answer is 4.", author_name="Principal"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build()
agent = workflow.as_agent("groupchat-agent")

Comment on lines +1310 to +1336
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests validate filtering for the AgentResponse output branch, but they don’t cover the GroupChat-style list[Message] output path (which is the likely source of #4261). If the goal is to prevent re-emitting user inputs for GroupChat workflows, add a regression test that runs a minimal GroupChat (or an executor that yields list[Message] conversation history) through workflow.as_agent() and asserts user-role messages are not surfaced.

Copilot uses AI. Check for mistakes.
# Collect streaming updates
updates: list[AgentResponseUpdate] = []
async for update in agent.run("hi", stream=True):
updates.append(update)

# Should only have assistant messages (3 out of 5)
assert len(updates) == 3, f"Expected 3 assistant updates, got {len(updates)}"

# Verify all updates are assistant role
for update in updates:
assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'"

# Verify author_name is propagated correctly
assert updates[0].author_name == "Principal"
assert updates[1].author_name == "Maths Teacher"
assert updates[2].author_name == "Principal"

# Verify the content is correct
texts = [u.text for u in updates]
Comment on lines +1348 to +1355
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider also asserting update.author_name for each update to verify the production code's author_name=msg.author_name mapping is preserved through filtering. Currently only role and text are verified on streaming updates.

Suggested change
# Verify the content is correct
texts = [u.text for u in updates]
# Verify all updates are assistant role
for update in updates:
assert update.role == "assistant", f"Expected role='assistant', got role='{update.role}'"
# Verify author_name is propagated correctly
assert updates[0].author_name == "Principal"
assert updates[1].author_name == "Maths Teacher"
assert updates[2].author_name == "Principal"

assert "Hello! How can I help?" in texts
assert "2+2 = 4" in texts
assert "The answer is 4." in texts

# Verify user messages are NOT present
assert "hi" not in texts
assert "what is 2+2?" not in texts

async def test_non_streaming_agent_response_filters_user_messages(self):
"""Test that non-streaming also filters out user messages from AgentResponse data."""

@executor
async def groupchat_like_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
response = AgentResponse(
messages=[
Message(role="user", text="hi"),
Message(role="assistant", text="Hello!", author_name="Bot"),
Message(role="user", text="bye"),
Message(role="assistant", text="Goodbye!", author_name="Bot"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=groupchat_like_executor).build()
agent = workflow.as_agent("groupchat-agent")

result = await agent.run("hi")

# Should only have assistant messages (2 out of 4)
assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}"

texts = [msg.text for msg in result.messages]
assert texts == ["Hello!", "Goodbye!"]

async def test_streaming_agent_response_all_assistant_unchanged(self):
"""Test that AgentResponse with only assistant messages works correctly (no regression)."""

@executor
async def assistant_only_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
response = AgentResponse(
messages=[
Message(role="assistant", text="First response"),
Message(role="assistant", text="Second response"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=assistant_only_executor).build()
agent = workflow.as_agent("assistant-only-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

assert len(updates) == 2
texts = [u.text for u in updates]
assert texts == ["First response", "Second response"]

async def test_streaming_agent_response_empty_after_filtering(self):
"""Test that AgentResponse with only user messages produces no updates."""

@executor
async def user_only_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
# Edge case: all messages are user role
response = AgentResponse(
messages=[
Message(role="user", text="user msg 1"),
Message(role="user", text="user msg 2"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=user_only_executor).build()
agent = workflow.as_agent("user-only-agent")

updates: list[AgentResponseUpdate] = []
async for update in agent.run("test", stream=True):
updates.append(update)

# No assistant messages means no updates
assert len(updates) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a test for non-assistant, non-user roles (e.g., 'system' or 'tool') to explicitly document that the filter drops those as well. This would prevent a future regression if someone expects system messages to pass through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy all feedback - I'll address tonight.


async def test_streaming_agent_response_filters_system_and_tool_roles(self):
"""Test that system and tool role messages are also filtered out.

The allowlist filter (role == "assistant") intentionally drops all non-assistant
roles, including "system" and "tool". This test documents that behavior explicitly
to prevent future regressions if someone expects those messages to pass through.
"""

@executor
async def mixed_role_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
response = AgentResponse(
messages=[
Message(role="system", text="You are a helpful assistant."),
Message(role="user", text="hi"),
Message(role="assistant", text="Hello!", author_name="Bot"),
Message(role="tool", text="tool result data"),
Message(role="assistant", text="Based on the tool result...", author_name="Bot"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=mixed_role_executor).build()
agent = workflow.as_agent("mixed-role-agent")

# Streaming path
updates: list[AgentResponseUpdate] = []
async for update in agent.run("hi", stream=True):
updates.append(update)

# Only assistant messages should survive (2 out of 5)
assert len(updates) == 2, f"Expected 2 assistant updates, got {len(updates)}"
for update in updates:
assert update.role == "assistant"
texts = [u.text for u in updates]
assert texts == ["Hello!", "Based on the tool result..."]

# Non-streaming path
workflow2 = WorkflowBuilder(start_executor=mixed_role_executor).build()
agent2 = workflow2.as_agent("mixed-role-agent-2")
result = await agent2.run("hi")

assert len(result.messages) == 2, f"Expected 2 messages, got {len(result.messages)}"
result_texts = [msg.text for msg in result.messages]
assert result_texts == ["Hello!", "Based on the tool result..."]

async def test_non_streaming_agent_response_empty_after_filtering(self):
"""Test that non-streaming AgentResponse with only non-assistant messages produces empty result.

Counterpart to test_streaming_agent_response_empty_after_filtering for the
non-streaming code path.
"""

@executor
async def user_only_executor(
messages: list[Message], ctx: WorkflowContext[Never, AgentResponse]
) -> None:
response = AgentResponse(
messages=[
Message(role="user", text="user msg 1"),
Message(role="user", text="user msg 2"),
],
)
await ctx.yield_output(response)

workflow = WorkflowBuilder(start_executor=user_only_executor).build()
agent = workflow.as_agent("user-only-agent")

result = await agent.run("test")

# No assistant messages means empty messages list
assert len(result.messages) == 0, f"Expected 0 messages, got {len(result.messages)}"
Loading