Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 114 additions & 139 deletions src/chatbot/api/async_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,10 @@ async def get_openai_response_stream(
settings: ServiceSettings = load_settings_file("config.toml", ServiceSettings)
OPENAI_API_KEY: str = settings.sdk_key
OPENAI_ENDPOINT: str = settings.sdk_base_url + "/v1/chat/completions"

headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}

if len(st.session_state[session_keys["short_term_memory"]]) == 0:
st.session_state[session_keys["short_term_memory"]].append({"role": "system", "content": SYSTEMPROMOT})

Expand All @@ -77,14 +75,13 @@ async def get_openai_response_stream(
Logger.custom(
f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)", badge=Badge("连接", fore="black", back="cyan")
)

try:
# 重置响应文本
st.session_state[session_keys["text_response"]] = ""
first_sentence_received = False
request_completed = False

# 确保用户消息在短期记忆中(每次重试都重新添加)
# 先检查最后一条是否是相同的用户消息,如果不是则添加
if (
not st.session_state[session_keys["short_term_memory"]]
or st.session_state[session_keys["short_term_memory"]][-1] != user_message
Expand All @@ -104,157 +101,138 @@ async def get_openai_response_stream(
"stream": True,
}

# 添加调试日志,显示发送的消息
Logger.info(f"发送的消息: {data['messages']}")

# 使用统一的超时时间控制整个首句获取过程
async def get_first_sentence_with_timeout(data):
nonlocal first_sentence_received

# 完整的流处理函数
async def process_complete_stream(data):
nonlocal first_sentence_received, request_completed
buffer = ""
t_start = time.monotonic()
first_sentence_time = None

# 设置超时
timeout = aiohttp.ClientTimeout(
total=request_timeout, connect=first_sentence_timeout - 0.5, sock_read=request_timeout
total=request_timeout,
connect=10.0, # 连接超时
sock_read=request_timeout, # 读取超时
)

async with aiohttp.ClientSession(timeout=timeout) as session:
Logger.debug("正在建立连接...")

async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp:
Logger.debug(f"连接已建立,状态码: {resp.status}")

if resp.status != 200:
error_text = await resp.text()
raise Exception(f"HTTP {resp.status}: {error_text}")

Logger.debug("开始接收响应流...")

async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue

data_str = decoded[6:]
if data_str.strip() == "[DONE]":
if buffer.strip() and not first_sentence_received:
Logger.debug(f"流结束,返回剩余内容: {repr(buffer)}")
return buffer.strip(), buffer, resp
break

try:
chunk = json.loads(data_str)
if "choices" in chunk and chunk["choices"]:
content = chunk["choices"][0]["delta"].get("content", "")
if content:
Logger.debug(f"收到内容块: {repr(content)}")
buffer += content

# 检查是否形成完整句子
m = re.search(r"[。!?!?\.]", buffer)
if m:
sentence = buffer[: m.end()].strip().replace("\n", "")
if sentence:
first_sentence_time = time.monotonic()
first_sentence_received = True
Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒")
Logger.debug(f"生成首句: {repr(sentence)}")
return sentence, buffer[m.end() :], resp
except json.JSONDecodeError as e:
Logger.warning(f"JSON解析错误: {e}")
continue
except Exception as e:
Logger.error(f"解析响应块时出错: {e}")
continue

# 如果没有找到完整句子但有内容
if buffer.strip():
Logger.debug(f"未找到完整句子,返回现有内容: {repr(buffer)}")
return buffer.strip(), "", resp

return None, "", resp

# 使用统一的超时时间
Logger.debug(f"等待首句响应(总超时时间: {first_sentence_timeout}秒)...")
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
Logger.debug("正在建立连接...")
async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp:
Logger.debug(f"连接已建立,状态码: {resp.status}")
if resp.status != 200:
error_text = await resp.text()
raise Exception(f"HTTP {resp.status}: {error_text}")

Logger.debug("开始接收响应流...")

# 首句超时检查
first_sentence_start_time = time.monotonic()

async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue

data_str = decoded[6:]
if data_str.strip() == "[DONE]":
Logger.debug("收到 [DONE] 标记,流结束")
# 处理剩余内容
if buffer.strip():
Logger.debug(f"处理剩余内容: {repr(buffer)}")
st.session_state[session_keys["text_response"]] += buffer.strip()
yield buffer.strip()
request_completed = True
break

try:
chunk = json.loads(data_str)
if "choices" in chunk and chunk["choices"]:
content = chunk["choices"][0]["delta"].get("content", "")
if content:
Logger.debug(f"收到内容块: {repr(content)}")
buffer += content

# 检查首句超时
if not first_sentence_received:
current_time = time.monotonic()
if current_time - first_sentence_start_time > first_sentence_timeout:
Logger.warning(f"首句超时({first_sentence_timeout}秒)")
raise TimeoutError("首句获取超时")

# 检查是否形成完整句子
while True:
m = re.search(r"[。!?!?\.]", buffer)
if m:
sentence = buffer[: m.end()].strip().replace("\n", "")
if sentence:
if not first_sentence_received:
first_sentence_time = time.monotonic()
first_sentence_received = True
Logger.debug(
f"首句耗时: {first_sentence_time - t_start:.3f} 秒"
)
Logger.info(f"首句接收成功: {repr(sentence)}")
else:
Logger.debug(f"后续句子: {repr(sentence)}")

st.session_state[session_keys["text_response"]] += sentence
yield sentence
buffer = buffer[m.end() :]
else:
break

except json.JSONDecodeError as e:
Logger.warning(f"JSON解析错误: {e}")
continue
except Exception as e:
Logger.error(f"解析响应块时出错: {e}")
continue

# 如果循环正常结束但没有收到 [DONE]
if not request_completed and buffer.strip():
Logger.debug(f"流异常结束,处理剩余内容: {repr(buffer)}")
st.session_state[session_keys["text_response"]] += buffer.strip()
yield buffer.strip()
request_completed = True

except Exception as e:
Logger.error(f"流处理过程中出错: {e}")
raise

# 执行流处理
Logger.debug(f"开始处理完整流(首句超时: {first_sentence_timeout}秒)...")
t_start = time.monotonic()

try:
result = await asyncio.wait_for(get_first_sentence_with_timeout(data), timeout=first_sentence_timeout)

first_sentence, remaining_buffer, resp = result

if first_sentence:
Logger.info(f"首句接收成功: {repr(first_sentence)}")
st.session_state[session_keys["text_response"]] += first_sentence
yield first_sentence

# 首句成功后,处理剩余内容
Logger.debug("首句成功,继续处理后续内容...")

try:
buffer = remaining_buffer

# 继续处理剩余的响应流
async for line in resp.content:
decoded = line.decode("utf-8").strip()
if not decoded or not decoded.startswith("data: "):
continue

data_str = decoded[6:]
if data_str.strip() == "[DONE]":
if buffer.strip():
Logger.debug(f"处理剩余内容: {repr(buffer)}")
st.session_state[session_keys["text_response"]] += buffer.strip()
yield buffer.strip()
break

try:
chunk = json.loads(data_str)
if "choices" in chunk and chunk["choices"]:
content = chunk["choices"][0]["delta"].get("content", "")
if content:
buffer += content

# 检查是否形成完整句子
while True:
m = re.search(r"[。!?!?\.]", buffer)
if m:
sentence = buffer[: m.end()].strip().replace("\n", "")
if sentence:
st.session_state[session_keys["text_response"]] += sentence
Logger.debug(f"后续句子: {repr(sentence)}")
yield sentence
buffer = buffer[m.end() :]
else:
break
except json.JSONDecodeError as e:
Logger.warning(f"JSON解析错误: {e}")
continue
except Exception as e:
Logger.error(f"解析后续响应时出错: {e}")
continue

except Exception as e:
Logger.warning(f"处理后续内容时出错: {e}")

# 成功完成
t_end = time.monotonic()
Logger.debug(f"openai 总耗时: {t_end - time.monotonic():.3f} 秒")
async for sentence in process_complete_stream(data):
yield sentence

# 检查是否成功完成
if request_completed and first_sentence_received:
t_end = time.monotonic()
Logger.debug(f"OpenAI 总耗时: {t_end - t_start:.3f} 秒")
st.session_state[session_keys["short_term_memory"]].append(
{"role": "assistant", "content": st.session_state[session_keys["text_response"]]}
)
Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}")

Logger.debug(f"短期记忆: {st.session_state[session_keys['short_term_memory']]}")
Logger.info("OpenAI请求成功完成")
return

elif not first_sentence_received:
Logger.warning("未收到首句,需要重试")
raise Exception("未收到首句")
else:
Logger.warning("未收到有效的首句内容")
Logger.warning("请求未完整完成,需要重试")
raise Exception("请求未完整完成")

except TimeoutError:
Logger.warning(f"首句获取总超时({first_sentence_timeout}秒)")
except TimeoutError as e:
Logger.warning(f"请求超时: {e}")
raise
except Exception as e:
Logger.error(f"流处理失败: {e}")
raise

except Exception as e:
Logger.error(f"第 {attempt + 1} 次请求出现异常: {e}")
Expand All @@ -263,7 +241,7 @@ async def get_first_sentence_with_timeout(data):
Logger.debug(f"异常详情: {traceback.format_exc()}")

# 重试逻辑
if not first_sentence_received and attempt < max_retries - 1:
if attempt < max_retries - 1:
# 从短期记忆中移除当前的用户消息,下次循环会重新添加
if (
st.session_state[session_keys["short_term_memory"]]
Expand All @@ -275,9 +253,6 @@ async def get_first_sentence_with_timeout(data):
Logger.info(f"等待 {wait_time} 秒后重试...")
await asyncio.sleep(wait_time)
continue
elif first_sentence_received:
Logger.info("首句已成功接收")
return
else:
break

Expand Down