From 61f18a4e2f28ab4cba6ea444c8c08ca2f5d65372 Mon Sep 17 00:00:00 2001 From: OhYee Date: Tue, 3 Mar 2026 11:52:45 +0800 Subject: [PATCH] test: migrate from astream_events to astream for reliable SSE streaming in CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate test implementations to use astream(stream_mode="updates") instead of astream_events to resolve async generator cancellation issues in CI environments. Also switch from httpx.ASGITransport to real HTTP servers with uvicorn for proper SSE stream handling in tests, addressing flaky streaming behavior in Linux CI environments. 将测试实现从 astream_events 迁移到 astream(stream_mode="updates") 以解决 CI 环境中的异步生成器取消问题。同时从 httpx.ASGITransport 切换到使用 uvicorn 的真实 HTTP 服务器,以正确处理测试中的 SSE 流式传输,解决 Linux CI 环境中的不稳定流式行为。 Change-Id: I4e9f694a80e952a94e240f479bb40fef59c0d649 Signed-off-by: OhYee --- .../langchain/test_agent_invoke_methods.py | 139 ++++++++++++++---- .../test_langchain_agui_integration.py | 72 ++++++--- 2 files changed, 157 insertions(+), 54 deletions(-) diff --git a/tests/unittests/integration/langchain/test_agent_invoke_methods.py b/tests/unittests/integration/langchain/test_agent_invoke_methods.py index d24404a..4f5237c 100644 --- a/tests/unittests/integration/langchain/test_agent_invoke_methods.py +++ b/tests/unittests/integration/langchain/test_agent_invoke_methods.py @@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str: return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" +def _start_server(app: FastAPI) -> tuple: + """启动 FastAPI 服务器并返回 (base_url, server, thread) + + 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + 因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应, + 会导致流式响应被提前取消 (CancelledError)。 + """ + port = _find_free_port() + config = uvicorn.Config( + app, host="127.0.0.1", port=port, log_level="warning" + ) + server = uvicorn.Server(config) + + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s: {base_url}" + ) + time.sleep(0.1) + + return base_url, server, thread + + def _build_mock_openai_app() -> FastAPI: """构建本地 OpenAI 协议兼容的简单服务""" app = FastAPI() @@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]: async def request_agui_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> List[Dict[str, Any]]: - """发送 AG-UI 请求并返回事件列表""" - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={"messages": messages, "stream": stream}, - timeout=60.0, - ) + """发送 AG-UI 请求并返回事件列表 + + Args: + server_url_or_app: 服务器 URL 或 FastAPI app 对象 + messages: 消息列表 + stream: 是否流式响应 + """ + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) assert response.status_code == 200 return parse_sse_events(response.text) @@ -670,7 +715,7 @@ def assert_openai_tool_call_response( async def request_openai_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: @@ -681,15 +726,23 @@ async def request_openai_events( "stream": stream, } - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/openai/v1/chat/completions", - json=payload, - timeout=60.0, - ) + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) assert response.status_code == 200 @@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str): @pytest.fixture def server_app_astream_events(agent_model): - """创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)""" + """创建使用 astream 的服务器(AG-UI/OpenAI 通用) + + 返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。 + httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。 + + 注意: 这里使用 astream(stream_mode="updates") 而非 astream_events, + 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现 + async generator 被提前取消或事件丢失的问题。 + astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。 + """ agent = build_agent(agent_model) async def invoke_agent(request: AgentRequest): @@ -770,8 +832,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -779,7 +841,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) # ============================================================================= @@ -1753,8 +1820,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1762,7 +1829,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.fixture def server_app_async(self, agent_model): @@ -1787,8 +1859,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1796,7 +1868,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.mark.parametrize( "case_key,prompt", diff --git a/tests/unittests/integration/test_langchain_agui_integration.py b/tests/unittests/integration/test_langchain_agui_integration.py index 6cfb32b..0824a56 100644 --- a/tests/unittests/integration/test_langchain_agui_integration.py +++ b/tests/unittests/integration/test_langchain_agui_integration.py @@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest): }] } + # 使用 astream(updates) 代替 astream_events, + # 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中 + # 会出现 async generator 被提前取消或事件丢失的问题。 converter = AgentRunConverter() - async for event in agent.astream_events(input_data, version="v2"): + async for event in agent.astream(input_data, stream_mode="updates"): for item in converter.convert(event): yield item - app = AgentRunServer(invoke_agent=invoke_agent).app + server_app = AgentRunServer(invoke_agent=invoke_agent).app - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={ - "messages": [{ - "role": "user", - "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", - }], - "stream": True, - }, - timeout=60.0, - ) + # 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + # 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。 + port = _find_free_port() + config = uvicorn.Config( + server_app, host="127.0.0.1", port=port, log_level="warning" + ) + uvicorn_server = uvicorn.Server(config) + server_thread = threading.Thread(target=uvicorn_server.run, daemon=True) + server_thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s" + ) + time.sleep(0.1) - assert response.status_code == 200 + try: + async with httpx.AsyncClient(base_url=base_url) as client: + response = await client.post( + "/ag-ui/agent", + json={ + "messages": [{ + "role": "user", + "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", + }], + "stream": True, + }, + timeout=60.0, + ) - events = [line for line in response.text.split("\n") if line] - # Normalize empty delta for consistency with check_result expectations - # astream_events yields "" for empty args, while astream yields "{}" - events = [e.replace('"delta":""', '"delta":"{}"') for e in events] - self.check_result(events) + assert response.status_code == 200 + + events = [line for line in response.text.split("\n") if line] + # Normalize empty delta for consistency with check_result expectations + # astream_events yields "" for empty args, while astream yields "{}" + events = [e.replace('"delta":""', '"delta":"{}"') for e in events] + self.check_result(events) + finally: + uvicorn_server.should_exit = True + server_thread.join(timeout=5) async def test_astream(self, mock_mcp_server): """测试多工具查询场景 (MCP + Local + MockLLM)"""