diff --git a/src/chatbot/api/async_api.py b/src/chatbot/api/async_api.py index c45456b..ac50055 100644 --- a/src/chatbot/api/async_api.py +++ b/src/chatbot/api/async_api.py @@ -60,12 +60,10 @@ async def get_openai_response_stream( settings: ServiceSettings = load_settings_file("config.toml", ServiceSettings) OPENAI_API_KEY: str = settings.sdk_key OPENAI_ENDPOINT: str = settings.sdk_base_url + "/v1/chat/completions" - headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } - if len(st.session_state[session_keys["short_term_memory"]]) == 0: st.session_state[session_keys["short_term_memory"]].append({"role": "system", "content": SYSTEMPROMOT}) @@ -77,14 +75,13 @@ async def get_openai_response_stream( Logger.custom( f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)", badge=Badge("连接", fore="black", back="cyan") ) - try: # 重置响应文本 st.session_state[session_keys["text_response"]] = "" first_sentence_received = False + request_completed = False # 确保用户消息在短期记忆中(每次重试都重新添加) - # 先检查最后一条是否是相同的用户消息,如果不是则添加 if ( not st.session_state[session_keys["short_term_memory"]] or st.session_state[session_keys["short_term_memory"]][-1] != user_message @@ -104,157 +101,138 @@ async def get_openai_response_stream( "stream": True, } - # 添加调试日志,显示发送的消息 Logger.info(f"发送的消息: {data['messages']}") - # 使用统一的超时时间控制整个首句获取过程 - async def get_first_sentence_with_timeout(data): - nonlocal first_sentence_received - + # 完整的流处理函数 + async def process_complete_stream(data): + nonlocal first_sentence_received, request_completed buffer = "" t_start = time.monotonic() first_sentence_time = None # 设置超时 timeout = aiohttp.ClientTimeout( - total=request_timeout, connect=first_sentence_timeout - 0.5, sock_read=request_timeout + total=request_timeout, + connect=10.0, # 连接超时 + sock_read=request_timeout, # 读取超时 ) - async with aiohttp.ClientSession(timeout=timeout) as session: - Logger.debug("正在建立连接...") - - async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: - Logger.debug(f"连接已建立,状态码: {resp.status}") - - if resp.status != 200: - error_text = await resp.text() - raise Exception(f"HTTP {resp.status}: {error_text}") - - Logger.debug("开始接收响应流...") - - async for line in resp.content: - decoded = line.decode("utf-8").strip() - if not decoded or not decoded.startswith("data: "): - continue - - data_str = decoded[6:] - if data_str.strip() == "[DONE]": - if buffer.strip() and not first_sentence_received: - Logger.debug(f"流结束,返回剩余内容: {repr(buffer)}") - return buffer.strip(), buffer, resp - break - - try: - chunk = json.loads(data_str) - if "choices" in chunk and chunk["choices"]: - content = chunk["choices"][0]["delta"].get("content", "") - if content: - Logger.debug(f"收到内容块: {repr(content)}") - buffer += content - - # 检查是否形成完整句子 - m = re.search(r"[。!?!?\.]", buffer) - if m: - sentence = buffer[: m.end()].strip().replace("\n", "") - if sentence: - first_sentence_time = time.monotonic() - first_sentence_received = True - Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒") - Logger.debug(f"生成首句: {repr(sentence)}") - return sentence, buffer[m.end() :], resp - except json.JSONDecodeError as e: - Logger.warning(f"JSON解析错误: {e}") - continue - except Exception as e: - Logger.error(f"解析响应块时出错: {e}") - continue - - # 如果没有找到完整句子但有内容 - if buffer.strip(): - Logger.debug(f"未找到完整句子,返回现有内容: {repr(buffer)}") - return buffer.strip(), "", resp - - return None, "", resp - - # 使用统一的超时时间 - Logger.debug(f"等待首句响应(总超时时间: {first_sentence_timeout}秒)...") + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + Logger.debug("正在建立连接...") + async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: + Logger.debug(f"连接已建立,状态码: {resp.status}") + if resp.status != 200: + error_text = await resp.text() + raise Exception(f"HTTP {resp.status}: {error_text}") + + Logger.debug("开始接收响应流...") + + # 首句超时检查 + first_sentence_start_time = time.monotonic() + + async for line in resp.content: + decoded = line.decode("utf-8").strip() + if not decoded or not decoded.startswith("data: "): + continue + + data_str = decoded[6:] + if data_str.strip() == "[DONE]": + Logger.debug("收到 [DONE] 标记,流结束") + # 处理剩余内容 + if buffer.strip(): + Logger.debug(f"处理剩余内容: {repr(buffer)}") + st.session_state[session_keys["text_response"]] += buffer.strip() + yield buffer.strip() + request_completed = True + break + + try: + chunk = json.loads(data_str) + if "choices" in chunk and chunk["choices"]: + content = chunk["choices"][0]["delta"].get("content", "") + if content: + Logger.debug(f"收到内容块: {repr(content)}") + buffer += content + + # 检查首句超时 + if not first_sentence_received: + current_time = time.monotonic() + if current_time - first_sentence_start_time > first_sentence_timeout: + Logger.warning(f"首句超时({first_sentence_timeout}秒)") + raise TimeoutError("首句获取超时") + + # 检查是否形成完整句子 + while True: + m = re.search(r"[。!?!?\.]", buffer) + if m: + sentence = buffer[: m.end()].strip().replace("\n", "") + if sentence: + if not first_sentence_received: + first_sentence_time = time.monotonic() + first_sentence_received = True + Logger.debug( + f"首句耗时: {first_sentence_time - t_start:.3f} 秒" + ) + Logger.info(f"首句接收成功: {repr(sentence)}") + else: + Logger.debug(f"后续句子: {repr(sentence)}") + + st.session_state[session_keys["text_response"]] += sentence + yield sentence + buffer = buffer[m.end() :] + else: + break + + except json.JSONDecodeError as e: + Logger.warning(f"JSON解析错误: {e}") + continue + except Exception as e: + Logger.error(f"解析响应块时出错: {e}") + continue + + # 如果循环正常结束但没有收到 [DONE] + if not request_completed and buffer.strip(): + Logger.debug(f"流异常结束,处理剩余内容: {repr(buffer)}") + st.session_state[session_keys["text_response"]] += buffer.strip() + yield buffer.strip() + request_completed = True + + except Exception as e: + Logger.error(f"流处理过程中出错: {e}") + raise + + # 执行流处理 + Logger.debug(f"开始处理完整流(首句超时: {first_sentence_timeout}秒)...") + t_start = time.monotonic() try: - result = await asyncio.wait_for(get_first_sentence_with_timeout(data), timeout=first_sentence_timeout) - - first_sentence, remaining_buffer, resp = result - - if first_sentence: - Logger.info(f"首句接收成功: {repr(first_sentence)}") - st.session_state[session_keys["text_response"]] += first_sentence - yield first_sentence - - # 首句成功后,处理剩余内容 - Logger.debug("首句成功,继续处理后续内容...") - - try: - buffer = remaining_buffer - - # 继续处理剩余的响应流 - async for line in resp.content: - decoded = line.decode("utf-8").strip() - if not decoded or not decoded.startswith("data: "): - continue - - data_str = decoded[6:] - if data_str.strip() == "[DONE]": - if buffer.strip(): - Logger.debug(f"处理剩余内容: {repr(buffer)}") - st.session_state[session_keys["text_response"]] += buffer.strip() - yield buffer.strip() - break - - try: - chunk = json.loads(data_str) - if "choices" in chunk and chunk["choices"]: - content = chunk["choices"][0]["delta"].get("content", "") - if content: - buffer += content - - # 检查是否形成完整句子 - while True: - m = re.search(r"[。!?!?\.]", buffer) - if m: - sentence = buffer[: m.end()].strip().replace("\n", "") - if sentence: - st.session_state[session_keys["text_response"]] += sentence - Logger.debug(f"后续句子: {repr(sentence)}") - yield sentence - buffer = buffer[m.end() :] - else: - break - except json.JSONDecodeError as e: - Logger.warning(f"JSON解析错误: {e}") - continue - except Exception as e: - Logger.error(f"解析后续响应时出错: {e}") - continue - - except Exception as e: - Logger.warning(f"处理后续内容时出错: {e}") - - # 成功完成 - t_end = time.monotonic() - Logger.debug(f"openai 总耗时: {t_end - time.monotonic():.3f} 秒") + async for sentence in process_complete_stream(data): + yield sentence + # 检查是否成功完成 + if request_completed and first_sentence_received: + t_end = time.monotonic() + Logger.debug(f"OpenAI 总耗时: {t_end - t_start:.3f} 秒") st.session_state[session_keys["short_term_memory"]].append( {"role": "assistant", "content": st.session_state[session_keys["text_response"]]} ) - Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}") - + Logger.debug(f"短期记忆: {st.session_state[session_keys['short_term_memory']]}") Logger.info("OpenAI请求成功完成") return - + elif not first_sentence_received: + Logger.warning("未收到首句,需要重试") + raise Exception("未收到首句") else: - Logger.warning("未收到有效的首句内容") + Logger.warning("请求未完整完成,需要重试") + raise Exception("请求未完整完成") - except TimeoutError: - Logger.warning(f"首句获取总超时({first_sentence_timeout}秒)") + except TimeoutError as e: + Logger.warning(f"请求超时: {e}") + raise + except Exception as e: + Logger.error(f"流处理失败: {e}") + raise except Exception as e: Logger.error(f"第 {attempt + 1} 次请求出现异常: {e}") @@ -263,7 +241,7 @@ async def get_first_sentence_with_timeout(data): Logger.debug(f"异常详情: {traceback.format_exc()}") # 重试逻辑 - if not first_sentence_received and attempt < max_retries - 1: + if attempt < max_retries - 1: # 从短期记忆中移除当前的用户消息,下次循环会重新添加 if ( st.session_state[session_keys["short_term_memory"]] @@ -275,9 +253,6 @@ async def get_first_sentence_with_timeout(data): Logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) continue - elif first_sentence_received: - Logger.info("首句已成功接收") - return else: break