diff --git a/tests/unittests/integration/langchain/test_agent_invoke_methods.py b/tests/unittests/integration/langchain/test_agent_invoke_methods.py index d24404a..4f5237c 100644 --- a/tests/unittests/integration/langchain/test_agent_invoke_methods.py +++ b/tests/unittests/integration/langchain/test_agent_invoke_methods.py @@ -81,6 +81,37 @@ def _sse(data: Dict[str, Any]) -> str: return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" +def _start_server(app: FastAPI) -> tuple: + """启动 FastAPI 服务器并返回 (base_url, server, thread) + + 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + 因为 ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应, + 会导致流式响应被提前取消 (CancelledError)。 + """ + port = _find_free_port() + config = uvicorn.Config( + app, host="127.0.0.1", port=port, log_level="warning" + ) + server = uvicorn.Server(config) + + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s: {base_url}" + ) + time.sleep(0.1) + + return base_url, server, thread + + def _build_mock_openai_app() -> FastAPI: """构建本地 OpenAI 协议兼容的简单服务""" app = FastAPI() @@ -297,20 +328,34 @@ def parse_sse_events(content: str) -> List[Dict[str, Any]]: async def request_agui_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> List[Dict[str, Any]]: - """发送 AG-UI 请求并返回事件列表""" - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={"messages": messages, "stream": stream}, - timeout=60.0, - ) + """发送 AG-UI 请求并返回事件列表 + + Args: + server_url_or_app: 服务器 URL 或 FastAPI app 对象 + messages: 消息列表 + stream: 是否流式响应 + """ + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/ag-ui/agent", + json={"messages": messages, "stream": stream}, + timeout=60.0, + ) assert response.status_code == 200 return parse_sse_events(response.text) @@ -670,7 +715,7 @@ def assert_openai_tool_call_response( async def request_openai_events( - server_app, + server_url_or_app: Union[str, FastAPI], messages: List[Dict[str, str]], stream: bool = True, ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: @@ -681,15 +726,23 @@ async def request_openai_events( "stream": stream, } - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=server_app), - base_url="http://test", - ) as client: - response = await client.post( - "/openai/v1/chat/completions", - json=payload, - timeout=60.0, - ) + if isinstance(server_url_or_app, str): + async with httpx.AsyncClient(base_url=server_url_or_app) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=server_url_or_app), + base_url="http://test", + ) as client: + response = await client.post( + "/openai/v1/chat/completions", + json=payload, + timeout=60.0, + ) assert response.status_code == 200 @@ -749,7 +802,16 @@ def agent_model(mock_openai_server: str): @pytest.fixture def server_app_astream_events(agent_model): - """创建使用 astream_events 的服务器(AG-UI/OpenAI 通用)""" + """创建使用 astream 的服务器(AG-UI/OpenAI 通用) + + 返回服务器 URL 而不是 app 对象,因为流式测试需要真实的 HTTP 连接。 + httpx.ASGITransport 在 CI 环境中无法正确处理 SSE 流式响应。 + + 注意: 这里使用 astream(stream_mode="updates") 而非 astream_events, + 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中会出现 + async generator 被提前取消或事件丢失的问题。 + astream_events 的转换逻辑由 test_convert_python_3_10/3_12 单独覆盖。 + """ agent = build_agent(agent_model) async def invoke_agent(request: AgentRequest): @@ -770,8 +832,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -779,7 +841,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) # ============================================================================= @@ -1753,8 +1820,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1762,7 +1829,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.fixture def server_app_async(self, agent_model): @@ -1787,8 +1859,8 @@ async def invoke_agent(request: AgentRequest): converter = AgentRunConverter() async def generator(): - async for event in agent.astream_events( - cast(Any, input_data), version="v2" + async for event in agent.astream( + cast(Any, input_data), stream_mode="updates" ): for item in converter.convert(event): yield item @@ -1796,7 +1868,12 @@ async def generator(): return generator() server = AgentRunServer(invoke_agent=invoke_agent) - return server.app + base_url, uvicorn_server, thread = _start_server(server.app) + + yield base_url + + uvicorn_server.should_exit = True + thread.join(timeout=5) @pytest.mark.parametrize( "case_key,prompt", diff --git a/tests/unittests/integration/test_langchain_agui_integration.py b/tests/unittests/integration/test_langchain_agui_integration.py index 6cfb32b..0824a56 100644 --- a/tests/unittests/integration/test_langchain_agui_integration.py +++ b/tests/unittests/integration/test_langchain_agui_integration.py @@ -648,36 +648,62 @@ async def invoke_agent(request: AgentRequest): }] } + # 使用 astream(updates) 代替 astream_events, + # 因为 astream_events 在 CI (Linux + uvicorn 线程) 环境中 + # 会出现 async generator 被提前取消或事件丢失的问题。 converter = AgentRunConverter() - async for event in agent.astream_events(input_data, version="v2"): + async for event in agent.astream(input_data, stream_mode="updates"): for item in converter.convert(event): yield item - app = AgentRunServer(invoke_agent=invoke_agent).app + server_app = AgentRunServer(invoke_agent=invoke_agent).app - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=app), - base_url="http://test", - ) as client: - response = await client.post( - "/ag-ui/agent", - json={ - "messages": [{ - "role": "user", - "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", - }], - "stream": True, - }, - timeout=60.0, - ) + # 使用真实的 HTTP 服务器而不是 httpx.ASGITransport, + # 因为 ASGITransport 在 CI 中无法正确处理 SSE 流式响应。 + port = _find_free_port() + config = uvicorn.Config( + server_app, host="127.0.0.1", port=port, log_level="warning" + ) + uvicorn_server = uvicorn.Server(config) + server_thread = threading.Thread(target=uvicorn_server.run, daemon=True) + server_thread.start() + + base_url = f"http://127.0.0.1:{port}" + for i in range(50): + try: + httpx.get(f"{base_url}/ag-ui/agent/health", timeout=0.2) + break + except Exception: + if i == 49: + raise RuntimeError( + f"Server failed to start within {50 * 0.1}s" + ) + time.sleep(0.1) - assert response.status_code == 200 + try: + async with httpx.AsyncClient(base_url=base_url) as client: + response = await client.post( + "/ag-ui/agent", + json={ + "messages": [{ + "role": "user", + "content": "查询当前的时间,并获取天气信息,同时输出我的密钥信息", + }], + "stream": True, + }, + timeout=60.0, + ) - events = [line for line in response.text.split("\n") if line] - # Normalize empty delta for consistency with check_result expectations - # astream_events yields "" for empty args, while astream yields "{}" - events = [e.replace('"delta":""', '"delta":"{}"') for e in events] - self.check_result(events) + assert response.status_code == 200 + + events = [line for line in response.text.split("\n") if line] + # Normalize empty delta for consistency with check_result expectations + # astream_events yields "" for empty args, while astream yields "{}" + events = [e.replace('"delta":""', '"delta":"{}"') for e in events] + self.check_result(events) + finally: + uvicorn_server.should_exit = True + server_thread.join(timeout=5) async def test_astream(self, mock_mcp_server): """测试多工具查询场景 (MCP + Local + MockLLM)"""