Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 108 additions & 31 deletions tests/unittests/integration/langchain/test_agent_invoke_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str:
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"


def _start_server(app: FastAPI) -> tuple:
"""启动 FastAPI 服务器并返回 (base_url, server, thread)

使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应,
会导致流式响应被提前取消 (CancelledError)。
"""
port = _find_free_port()
config = uvicorn.Config(
app, host="127.0.0.1", port=port, log_level="warning"
)
server = uvicorn.Server(config)

thread = threading.Thread(target=server.run, daemon=True)
thread.start()

base_url = f"http://127.0.0.1:{port}"
for i in range(50):
try:
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
break
except Exception:
if i == 49:
raise RuntimeError(
f"Server failed to start within {50 * 0.1}s: {base_url}"
)
time.sleep(0.1)

return base_url, server, thread


def _build_mock_openai_app() -> FastAPI:
"""构建本地 OpenAI 协议兼容的简单服务"""
app = FastAPI()
Expand Down Expand Up @@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]:


async def request_agui_events(
server_app,
server_url_or_app: Union[str, FastAPI],
messages: List[Dict[str, str]],
stream: bool = True,
) -> List[Dict[str, Any]]:
"""发送 AG-UI 请求并返回事件列表"""
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=server_app),
base_url="http://test",
) as client:
response = await client.post(
"/ag-ui/agent",
json={"messages": messages, "stream": stream},
timeout=60.0,
)
"""发送 AG-UI 请求并返回事件列表

Args:
server_url_or_app: 服务器 URL 或 FastAPI app 对象
messages: 消息列表
stream: 是否流式响应
"""
if isinstance(server_url_or_app, str):
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
response = await client.post(
"/ag-ui/agent",
json={"messages": messages, "stream": stream},
timeout=60.0,
)
else:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=server_url_or_app),
base_url="http://test",
) as client:
response = await client.post(
"/ag-ui/agent",
json={"messages": messages, "stream": stream},
timeout=60.0,
)

assert response.status_code == 200
return parse_sse_events(response.text)
Expand Down Expand Up @@ -670,7 +715,7 @@ def assert_openai_tool_call_response(


async def request_openai_events(
server_app,
server_url_or_app: Union[str, FastAPI],
messages: List[Dict[str, str]],
stream: bool = True,
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
Expand All @@ -681,15 +726,23 @@ async def request_openai_events(
"stream": stream,
}

async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=server_app),
base_url="http://test",
) as client:
response = await client.post(
"/openai/v1/chat/completions",
json=payload,
timeout=60.0,
)
if isinstance(server_url_or_app, str):
async with httpx.AsyncClient(base_url=server_url_or_app) as client:
response = await client.post(
"/openai/v1/chat/completions",
json=payload,
timeout=60.0,
)
else:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=server_url_or_app),
base_url="http://test",
) as client:
response = await client.post(
"/openai/v1/chat/completions",
json=payload,
timeout=60.0,
)

assert response.status_code == 200

Expand Down Expand Up @@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str):

@pytest.fixture
def server_app_astream_events(agent_model):
"""创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)"""
"""创建使用 astream 的服务器(AG-UI/OpenAI 通用)

返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。
httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。

注意: 这里使用 astream(stream_mode="updates") 而非 astream_events,
因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现
async generator 被提前取消或事件丢失的问题。
astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。
"""
agent = build_agent(agent_model)

async def invoke_agent(request: AgentRequest):
Expand All @@ -770,16 +832,21 @@ async def invoke_agent(request: AgentRequest):
converter = AgentRunConverter()

async def generator():
async for event in agent.astream_events(
cast(Any, input_data), version="v2"
async for event in agent.astream(
cast(Any, input_data), stream_mode="updates"
):
for item in converter.convert(event):
yield item

return generator()

server = AgentRunServer(invoke_agent=invoke_agent)
return server.app
base_url, uvicorn_server, thread = _start_server(server.app)

yield base_url

uvicorn_server.should_exit = True
thread.join(timeout=5)


# =============================================================================
Expand Down Expand Up @@ -1753,16 +1820,21 @@ async def invoke_agent(request: AgentRequest):
converter = AgentRunConverter()

async def generator():
async for event in agent.astream_events(
cast(Any, input_data), version="v2"
async for event in agent.astream(
cast(Any, input_data), stream_mode="updates"
):
for item in converter.convert(event):
yield item

return generator()

server = AgentRunServer(invoke_agent=invoke_agent)
return server.app
base_url, uvicorn_server, thread = _start_server(server.app)

yield base_url

uvicorn_server.should_exit = True
thread.join(timeout=5)

@pytest.fixture
def server_app_async(self, agent_model):
Expand All @@ -1787,16 +1859,21 @@ async def invoke_agent(request: AgentRequest):
converter = AgentRunConverter()

async def generator():
async for event in agent.astream_events(
cast(Any, input_data), version="v2"
async for event in agent.astream(
cast(Any, input_data), stream_mode="updates"
):
for item in converter.convert(event):
yield item

return generator()

server = AgentRunServer(invoke_agent=invoke_agent)
return server.app
base_url, uvicorn_server, thread = _start_server(server.app)

yield base_url

uvicorn_server.should_exit = True
thread.join(timeout=5)

@pytest.mark.parametrize(
"case_key,prompt",
Expand Down
72 changes: 49 additions & 23 deletions tests/unittests/integration/test_langchain_agui_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest):
}]
}

# 使用 astream(updates) 代替 astream_events,
# 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中
# 会出现 async generator 被提前取消或事件丢失的问题。
converter = AgentRunConverter()
async for event in agent.astream_events(input_data, version="v2"):
async for event in agent.astream(input_data, stream_mode="updates"):
for item in converter.convert(event):
yield item

app = AgentRunServer(invoke_agent=invoke_agent).app
server_app = AgentRunServer(invoke_agent=invoke_agent).app

async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/ag-ui/agent",
json={
"messages": [{
"role": "user",
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
}],
"stream": True,
},
timeout=60.0,
)
# 使用真实的 HTTP 服务器而不是 httpx.ASGITransport,
# 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。
port = _find_free_port()
config = uvicorn.Config(
server_app, host="127.0.0.1", port=port, log_level="warning"
)
uvicorn_server = uvicorn.Server(config)
server_thread = threading.Thread(target=uvicorn_server.run, daemon=True)
server_thread.start()

base_url = f"http://127.0.0.1:{port}"
for i in range(50):
try:
httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2)
break
except Exception:
if i == 49:
raise RuntimeError(
f"Server failed to start within {50 * 0.1}s"
)
time.sleep(0.1)

assert response.status_code == 200
try:
async with httpx.AsyncClient(base_url=base_url) as client:
response = await client.post(
"/ag-ui/agent",
json={
"messages": [{
"role": "user",
"content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息",
}],
"stream": True,
},
timeout=60.0,
)

events = [line for line in response.text.split("\n") if line]
# Normalize empty delta for consistency with check_result expectations
# astream_events yields "" for empty args, while astream yields "{}"
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
self.check_result(events)
assert response.status_code == 200

events = [line for line in response.text.split("\n") if line]
# Normalize empty delta for consistency with check_result expectations
# astream_events yields "" for empty args, while astream yields "{}"
events = [e.replace('"delta":""', '"delta":"{}"') for e in events]
self.check_result(events)
finally:
uvicorn_server.should_exit = True
server_thread.join(timeout=5)

async def test_astream(self, mock_mcp_server):
"""测试多工具查询场景 (MCP + Local + MockLLM)"""
Expand Down