diff --git "a/models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" "b/models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" new file mode 100644 index 0000000..03ad5fa Binary files /dev/null and "b/models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" differ diff --git a/src/chatbot/__main__.py b/src/chatbot/__main__.py index 04085c3..63fc482 100644 --- a/src/chatbot/__main__.py +++ b/src/chatbot/__main__.py @@ -1,16 +1,23 @@ +# type: ignore from __future__ import annotations import asyncio import logging import signal from pathlib import Path +from typing import TYPE_CHECKING from chatbot.chatter.audio_processing_workflow import initialize_session_state, process_voice_command +from chatbot.chatter.voice_activity_monitor import VoiceActivityMonitor # 新增 from chatbot.chatter.voice_recorder import VoiceRecorder from chatbot.chatter.wake_word_detector import WakeWordDetector from chatbot.config_manager import ServiceSettings, load_settings_file from chatbot.console.logger import Logger, set_logger_debug +if TYPE_CHECKING: + import numpy as np + from numpy.typing import NDArray + # 配置日志 def configure_logging() -> None: @@ -18,8 +25,8 @@ def configure_logging() -> None: logging.getLogger(name) for name in logging.root.manager.loggerDict if name.startswith("streamlit") ] for logger in streamlit_loggers: - logger.setLevel(logging.ERROR) # 裸模式(非 streamlit run 交互式)运行会带来很多警告信息,这里将其设置为ERROR级别 - set_logger_debug() # 设置日志记录器为调试模式 + logger.setLevel(logging.ERROR) + set_logger_debug() class VoiceAssistant: @@ -29,33 +36,89 @@ def __init__(self): self.settings = load_settings_file("config.toml", ServiceSettings) self.cache_dir = Path(self.settings.cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) + self.wake_detector = WakeWordDetector( access_key=self.settings.access_key, system_platform=self.settings.system_platform ) self.voice_recorder = VoiceRecorder(cache_dir=self.cache_dir) + self.voice_monitor = VoiceActivityMonitor(cache_dir=self.cache_dir) # 新增 + + self.is_awake = False # 跟踪唤醒状态 + self.keep_alive_timeout = 10.0 # 保持唤醒的超时时间(秒) async def start(self) -> None: """启动程序:监听唤醒词并录音""" running = True while running: - # 监听唤醒词 - wakeup_success = await self.wake_detector.listen_for_wakeup() - if wakeup_success: - # 唤醒后开始录音 - asr_result = await self.voice_recorder.record_and_process() - if asr_result: - # 处理ASR结果 - await process_voice_command(asr_result, self.cache_dir) - Logger.info("录音已完成,重新进入唤醒词监听状态") - await asyncio.sleep(0.5) + if not self.is_awake: + # 未唤醒状态:监听唤醒词 + wakeup_success = await self.wake_detector.listen_for_wakeup() + if wakeup_success: + self.is_awake = True + Logger.info("已唤醒,进入对话模式") + # 首次唤醒后立即开始录音 + await self.handle_conversation() + else: + Logger.warning("唤醒词检测失败") + running = False else: - Logger.warning("唤醒词检测失败") - running = False + # 已唤醒状态:保持活跃并监听语音活动 + await self.keep_alive_and_listen() + await self.stop() + async def keep_alive_and_listen(self) -> None: + """保持唤醒状态并监听语音活动""" + Logger.info(f"保持唤醒状态,监听语音活动(超时时间:{self.keep_alive_timeout}秒)") + + try: + # 监听语音活动,带超时,现在返回音频数据 + result = await asyncio.wait_for( + self.voice_monitor.listen_for_voice_activity(), timeout=self.keep_alive_timeout + ) + + voice_detected, pre_collected_audio = result + + if voice_detected: + Logger.info("检测到语音活动,开始新的对话") + # 将预收集的音频传递给录音器 + if pre_collected_audio is not None: + await self.handle_conversation_with_pre_audio(pre_collected_audio) + else: + await self.handle_conversation() + else: + Logger.info("未检测到语音活动") + self.is_awake = False + + except TimeoutError: + Logger.info(f"{self.keep_alive_timeout}秒内未检测到语音活动,进入睡眠状态") + self.is_awake = False + except Exception as e: + Logger.error(f"语音活动监听出错: {e}") + self.is_awake = False + + async def handle_conversation_with_pre_audio(self, pre_collected_audio: NDArray[np.float32]) -> None: + """处理带有预收集音频的对话""" + asr_result = await self.voice_recorder.record_with_pre_collected_audio(pre_collected_audio) + if asr_result: + await process_voice_command(asr_result, self.cache_dir) + Logger.info("对话处理完成") + else: + Logger.warning("未获取到有效的语音输入") + + async def handle_conversation(self) -> None: + """处理一次完整的对话""" + asr_result = await self.voice_recorder.record_and_process() + if asr_result: + await process_voice_command(asr_result, self.cache_dir) + Logger.info("对话处理完成") + else: + Logger.warning("未获取到有效的语音输入") + async def stop(self) -> None: """停止程序""" await self.wake_detector.cleanup_resources() + await self.voice_monitor.cleanup_resources() Logger.info("程序已停止") diff --git a/src/chatbot/api/async_api.py b/src/chatbot/api/async_api.py index d8602e8..c45456b 100644 --- a/src/chatbot/api/async_api.py +++ b/src/chatbot/api/async_api.py @@ -1,3 +1,5 @@ +# type: ignore + from __future__ import annotations import asyncio @@ -14,7 +16,7 @@ from chatbot._dictionary import session_keys from chatbot.config_manager import ServiceSettings, load_settings_file -from chatbot.console.logger import Logger +from chatbot.console.logger import Badge, Logger from chatbot.tools.audio import file_to_opus, file_to_wav, play_opus_file if TYPE_CHECKING: @@ -47,75 +49,245 @@ async def get_openai_response_stream( stop: list[str] | None = None, presence_penalty: float = 0, frequency_penalty: float = 0, + max_retries: int = 3, + first_sentence_timeout: float = 3.5, + request_timeout: float = 30.0, ): """ 获取OpenAI API的响应(流式,异步) + 修复重试时用户消息丢失的问题 """ settings: ServiceSettings = load_settings_file("config.toml", ServiceSettings) OPENAI_API_KEY: str = settings.sdk_key OPENAI_ENDPOINT: str = settings.sdk_base_url + "/v1/chat/completions" + headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } + if len(st.session_state[session_keys["short_term_memory"]]) == 0: - # 如果短期记忆为空,添加系统提示 st.session_state[session_keys["short_term_memory"]].append({"role": "system", "content": SYSTEMPROMOT}) - st.session_state[session_keys["short_term_memory"]].append({"role": "user", "content": prompt}) - data = { - "model": model, - "messages": st.session_state[session_keys["short_term_memory"]], - "max_tokens": max_tokens, - "temperature": temperature, - "n": n, - "stop": stop, - "presence_penalty": presence_penalty, - "frequency_penalty": frequency_penalty, - "stream": True, - } - buffer = "" - t_start = time.monotonic() # 发起请求前的时间 - first_sentence_time = None - async with aiohttp.ClientSession() as session: - async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: - async for line in resp.content: - decoded = line.decode("utf-8").strip() - if not decoded or not decoded.startswith("data: "): - continue - data_str = decoded[6:] - if data_str.strip() == "[DONE]": - if buffer: - yield buffer - break - try: - chunk = json.loads(data_str) - if "choices" in chunk and chunk["choices"]: - content = chunk["choices"][0]["delta"].get("content", "") - buffer += content - while True: - m = re.search(r"[。!?!?\.]", buffer) - if m: - sentence = buffer[: m.end()].strip().replace("\n", "") - # 第一次分句,记录耗时 - if first_sentence_time is None: - first_sentence_time = time.monotonic() - Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒") - st.session_state[session_keys["text_response"]] += sentence - yield sentence - buffer = buffer[m.end() :] - else: + # 保存原始的用户消息,用于重试 + user_message = {"role": "user", "content": prompt} + + # 重试循环 + for attempt in range(max_retries): + Logger.custom( + f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)", badge=Badge("连接", fore="black", back="cyan") + ) + + try: + # 重置响应文本 + st.session_state[session_keys["text_response"]] = "" + first_sentence_received = False + + # 确保用户消息在短期记忆中(每次重试都重新添加) + # 先检查最后一条是否是相同的用户消息,如果不是则添加 + if ( + not st.session_state[session_keys["short_term_memory"]] + or st.session_state[session_keys["short_term_memory"]][-1] != user_message + ): + st.session_state[session_keys["short_term_memory"]].append(user_message) + + # 准备请求数据 + data = { + "model": model, + "messages": st.session_state[session_keys["short_term_memory"]], + "max_tokens": max_tokens, + "temperature": temperature, + "n": n, + "stop": stop, + "presence_penalty": presence_penalty, + "frequency_penalty": frequency_penalty, + "stream": True, + } + + # 添加调试日志,显示发送的消息 + Logger.info(f"发送的消息: {data['messages']}") + + # 使用统一的超时时间控制整个首句获取过程 + async def get_first_sentence_with_timeout(data): + nonlocal first_sentence_received + + buffer = "" + t_start = time.monotonic() + first_sentence_time = None + + # 设置超时 + timeout = aiohttp.ClientTimeout( + total=request_timeout, connect=first_sentence_timeout - 0.5, sock_read=request_timeout + ) + + async with aiohttp.ClientSession(timeout=timeout) as session: + Logger.debug("正在建立连接...") + + async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: + Logger.debug(f"连接已建立,状态码: {resp.status}") + + if resp.status != 200: + error_text = await resp.text() + raise Exception(f"HTTP {resp.status}: {error_text}") + + Logger.debug("开始接收响应流...") + + async for line in resp.content: + decoded = line.decode("utf-8").strip() + if not decoded or not decoded.startswith("data: "): + continue + + data_str = decoded[6:] + if data_str.strip() == "[DONE]": + if buffer.strip() and not first_sentence_received: + Logger.debug(f"流结束,返回剩余内容: {repr(buffer)}") + return buffer.strip(), buffer, resp + break + + try: + chunk = json.loads(data_str) + if "choices" in chunk and chunk["choices"]: + content = chunk["choices"][0]["delta"].get("content", "") + if content: + Logger.debug(f"收到内容块: {repr(content)}") + buffer += content + + # 检查是否形成完整句子 + m = re.search(r"[。!?!?\.]", buffer) + if m: + sentence = buffer[: m.end()].strip().replace("\n", "") + if sentence: + first_sentence_time = time.monotonic() + first_sentence_received = True + Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒") + Logger.debug(f"生成首句: {repr(sentence)}") + return sentence, buffer[m.end() :], resp + except json.JSONDecodeError as e: + Logger.warning(f"JSON解析错误: {e}") + continue + except Exception as e: + Logger.error(f"解析响应块时出错: {e}") + continue + + # 如果没有找到完整句子但有内容 + if buffer.strip(): + Logger.debug(f"未找到完整句子,返回现有内容: {repr(buffer)}") + return buffer.strip(), "", resp + + return None, "", resp + + # 使用统一的超时时间 + Logger.debug(f"等待首句响应(总超时时间: {first_sentence_timeout}秒)...") + + try: + result = await asyncio.wait_for(get_first_sentence_with_timeout(data), timeout=first_sentence_timeout) + + first_sentence, remaining_buffer, resp = result + + if first_sentence: + Logger.info(f"首句接收成功: {repr(first_sentence)}") + st.session_state[session_keys["text_response"]] += first_sentence + yield first_sentence + + # 首句成功后,处理剩余内容 + Logger.debug("首句成功,继续处理后续内容...") + + try: + buffer = remaining_buffer + + # 继续处理剩余的响应流 + async for line in resp.content: + decoded = line.decode("utf-8").strip() + if not decoded or not decoded.startswith("data: "): + continue + + data_str = decoded[6:] + if data_str.strip() == "[DONE]": + if buffer.strip(): + Logger.debug(f"处理剩余内容: {repr(buffer)}") + st.session_state[session_keys["text_response"]] += buffer.strip() + yield buffer.strip() break - except Exception as e: - Logger.error(f"{e}") - # 可选,总耗时打印 - t_end = time.monotonic() - Logger.debug(f"openai 总耗时: {t_end - t_start:.3f} 秒") + try: + chunk = json.loads(data_str) + if "choices" in chunk and chunk["choices"]: + content = chunk["choices"][0]["delta"].get("content", "") + if content: + buffer += content + + # 检查是否形成完整句子 + while True: + m = re.search(r"[。!?!?\.]", buffer) + if m: + sentence = buffer[: m.end()].strip().replace("\n", "") + if sentence: + st.session_state[session_keys["text_response"]] += sentence + Logger.debug(f"后续句子: {repr(sentence)}") + yield sentence + buffer = buffer[m.end() :] + else: + break + except json.JSONDecodeError as e: + Logger.warning(f"JSON解析错误: {e}") + continue + except Exception as e: + Logger.error(f"解析后续响应时出错: {e}") + continue + + except Exception as e: + Logger.warning(f"处理后续内容时出错: {e}") + + # 成功完成 + t_end = time.monotonic() + Logger.debug(f"openai 总耗时: {t_end - time.monotonic():.3f} 秒") + + st.session_state[session_keys["short_term_memory"]].append( + {"role": "assistant", "content": st.session_state[session_keys["text_response"]]} + ) + Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}") + + Logger.info("OpenAI请求成功完成") + return + + else: + Logger.warning("未收到有效的首句内容") + + except TimeoutError: + Logger.warning(f"首句获取总超时({first_sentence_timeout}秒)") + + except Exception as e: + Logger.error(f"第 {attempt + 1} 次请求出现异常: {e}") + import traceback + + Logger.debug(f"异常详情: {traceback.format_exc()}") + + # 重试逻辑 + if not first_sentence_received and attempt < max_retries - 1: + # 从短期记忆中移除当前的用户消息,下次循环会重新添加 + if ( + st.session_state[session_keys["short_term_memory"]] + and st.session_state[session_keys["short_term_memory"]][-1]["role"] == "user" + ): + st.session_state[session_keys["short_term_memory"]].pop() + + wait_time = 0.5 + attempt * 0.5 + Logger.info(f"等待 {wait_time} 秒后重试...") + await asyncio.sleep(wait_time) + continue + elif first_sentence_received: + Logger.info("首句已成功接收") + return + else: + break + + # 所有重试失败 + Logger.error("所有重试都失败了,返回默认响应") + st.session_state[session_keys["text_response"]] = "抱歉,我现在无法正常回应,请稍后再试。" st.session_state[session_keys["short_term_memory"]].append( {"role": "assistant", "content": st.session_state[session_keys["text_response"]]} ) - Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}") + yield "抱歉,我现在无法正常回应,请稍后再试。" async def async_get_tts_response(text: str): diff --git a/src/chatbot/chatter/__init__.py b/src/chatbot/chatter/__init__.py index 934d2c3..f499f67 100644 --- a/src/chatbot/chatter/__init__.py +++ b/src/chatbot/chatter/__init__.py @@ -3,7 +3,7 @@ # 音频参数 RATE = 16000 # 采样率 CHANNELS = 1 # 单声道 -SILENCE_THRESHOLD = 1000 # 静音检测阈值(毫秒) +SILENCE_THRESHOLD = 600 # 静音检测阈值(毫秒) MAX_RECORDING_TIME = 30 # 最长录音时间(秒) -SEGMENT_DURATION = 1.5 # 每个VAD检测段的持续时间(秒) -INITIAL_WAIT_TIME = 3.0 # 初始等待时间(秒) +SEGMENT_DURATION = 0.8 # 每个VAD检测段的持续时间(秒) +INITIAL_WAIT_TIME = 1.5 # 初始等待时间(秒) diff --git a/src/chatbot/chatter/audio_processing_workflow.py b/src/chatbot/chatter/audio_processing_workflow.py index f658841..69bfaab 100644 --- a/src/chatbot/chatter/audio_processing_workflow.py +++ b/src/chatbot/chatter/audio_processing_workflow.py @@ -66,21 +66,38 @@ async def play_worker(cache_dir: Path) -> None: async def process_voice_command(prompt: str, cache_dir: Path) -> None: """处理语音命令的完整流程""" + Logger.info(f"开始处理语音命令: {prompt}") # 添加这行日志 + # 确保缓存目录存在 (cache_dir / "tts").mkdir(parents=True, exist_ok=True) - # 创建任务 - producer = asyncio.create_task(sentence_producer(prompt)) - tts = asyncio.create_task(tts_worker(cache_dir / "tts")) - play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + try: + # 创建任务 + producer = asyncio.create_task(sentence_producer(prompt)) + tts = asyncio.create_task(tts_worker(cache_dir / "tts")) + play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + + Logger.info("已创建处理任务,等待生产者完成...") # 添加这行日志 + + # 等待生产者完成 + await producer + + Logger.info("生产者完成,等待队列处理...") # 添加这行日志 + + # 等待所有队列处理完毕 + await st.session_state[session_keys["sentence_que"]].join() + await st.session_state[session_keys["tts_que"]].join() + + Logger.info("队列处理完成,取消剩余任务...") # 添加这行日志 + + # 取消剩余任务 + tts.cancel() + play.cancel() - # 等待生产者完成 - await producer + Logger.info("语音命令处理完成") # 添加这行日志 - # 等待所有队列处理完毕 - await st.session_state[session_keys["sentence_que"]].join() - await st.session_state[session_keys["tts_que"]].join() + except Exception as e: + Logger.error(f"处理语音命令时出错: {e}") + import traceback - # 取消剩余任务 - tts.cancel() - play.cancel() + Logger.error(f"错误详情: {traceback.format_exc()}") diff --git a/src/chatbot/chatter/util.py b/src/chatbot/chatter/util.py index 3742b95..9971a6b 100644 --- a/src/chatbot/chatter/util.py +++ b/src/chatbot/chatter/util.py @@ -11,7 +11,11 @@ def handle_porcupine_keyword(system_platform: SystemPlatform): key_words = ["你好"] """获取 Porcupine 关键词和关键词特征文件路径""" if system_platform == "linux": - keyword_paths = ["./models/keywords_spotting/你好_linux.ppn"] + keyword_paths = [ + "./models/keywords_spotting/你好_linux.ppn", + "./models/keywords_spotting/派蒙_zh_linux_v3_0_0.ppn", + ] + key_words.extend(["派蒙"]) elif system_platform == "mac": keyword_paths = ["./models/keywords_spotting/你好_mac.ppn"] elif system_platform == "raspberry-pi": diff --git a/src/chatbot/chatter/voice_activity_monitor.py b/src/chatbot/chatter/voice_activity_monitor.py new file mode 100644 index 0000000..26262a5 --- /dev/null +++ b/src/chatbot/chatter/voice_activity_monitor.py @@ -0,0 +1,176 @@ +# type: ignore +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +import numpy as np +import sounddevice as sd +import soundfile as sf + +from chatbot.api.async_api import async_file_to_opus, async_get_vad_response +from chatbot.chatter import CHANNELS, RATE +from chatbot.console.logger import Logger +from chatbot.tools.timed_helper import get_time_tag_with_millis + +if TYPE_CHECKING: + from pathlib import Path + + from numpy.typing import NDArray + + +class VoiceActivityMonitor: + def __init__(self, cache_dir: Path): + self.cache_dir = cache_dir / "vad_monitor" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.monitoring = False + self.audio_frames: list[NDArray[np.float32]] = [] + self.all_collected_frames: list[NDArray[np.float32]] = [] # 保存所有收集的音频 + self.check_interval = 1.0 # 每秒检查一次 + self.min_audio_length = 0.5 # 最小音频长度(秒) + + def audio_callback( + self, indata: NDArray[np.float32], _frames: int, _time: dict[str, float], status: sd.CallbackFlags + ) -> None: + """声音设备的回调函数""" + if status: + Logger.warning(f"音频回调状态: {status}") + if self.monitoring: + self.audio_frames.append(indata.copy()) + self.all_collected_frames.append(indata.copy()) # 同时保存到总集合中 + + async def save_audio_segment(self, audio_data: NDArray[np.float32]) -> Path: + """保存音频片段为Opus文件""" + time_label = get_time_tag_with_millis() + temp_wav_path = self.cache_dir / f"vad_check_{time_label}.wav" + opus_path = self.cache_dir / f"vad_check_{time_label}.opus" + + # 保存为WAV + sf.write(temp_wav_path, audio_data, RATE, format="WAV", subtype="PCM_16") + + # 转换为Opus + await async_file_to_opus(temp_wav_path, opus_path) + + # 删除临时WAV文件 + temp_wav_path.unlink() + + return opus_path + + async def check_voice_activity(self, audio_segment: NDArray[np.float32]) -> bool: + """检查音频片段是否包含语音活动""" + if len(audio_segment) == 0: + return False + + try: + # 保存音频片段 + audio_path = await self.save_audio_segment(audio_segment) + + # 发送到VAD服务 + vad_result = await async_get_vad_response(audio_path) + + # 清理临时文件 + if audio_path.exists(): + audio_path.unlink() + + if not vad_result: + return False + + # 检查是否有语音活动 + timestamps = vad_result.get("timestamp", []) + has_voice = len(timestamps) > 0 and any(len(ts) > 0 for ts in timestamps) + + if has_voice: + Logger.info(f"检测到语音活动: {timestamps}") + + return has_voice + + except Exception as e: + Logger.error(f"VAD检查出错: {e}") + return False + + async def listen_for_voice_activity(self) -> tuple[bool, NDArray[np.float32] | None]: + """监听语音活动,返回是否检测到语音以及收集的音频数据""" + self.monitoring = True + self.audio_frames = [] + self.all_collected_frames = [] # 重置总音频收集 + + # 计算每次检查需要的样本数 + samples_per_check = int(RATE * self.check_interval) + min_samples = int(RATE * self.min_audio_length) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + stream.start() + + Logger.info("开始监听语音活动...") + + try: + while self.monitoring: + # 等待收集足够的音频数据 + await asyncio.sleep(self.check_interval) + + if not self.monitoring: + break + + # 检查是否有足够的音频数据 + total_samples = sum(len(frame) for frame in self.audio_frames) + + if total_samples >= min_samples: + # 提取音频片段进行检查 + frames_to_check = [] + samples_collected = 0 + + for frame in self.audio_frames: + frames_to_check.append(frame) + samples_collected += len(frame) + if samples_collected >= samples_per_check: + break + + if frames_to_check: + audio_segment = np.concatenate(frames_to_check, axis=0) + + # 检查语音活动 + has_voice = await self.check_voice_activity(audio_segment) + + if has_voice: + Logger.info("检测到语音活动!") + # 返回检测到语音活动以及到目前为止收集的所有音频 + if self.all_collected_frames: + collected_audio = np.concatenate(self.all_collected_frames, axis=0) + duration = len(collected_audio) / RATE + Logger.info(f"返回已收集的音频数据,时长: {duration:.2f} 秒") + return True, collected_audio + else: + return True, None + + # 清理已检查的帧,保留一些重叠 + overlap_samples = int(samples_per_check * 0.2) # 20%重叠 + remaining_samples = 0 + remaining_frames = [] + + # 从后往前保留重叠部分 + for frame in reversed(self.audio_frames): + if remaining_samples + len(frame) <= overlap_samples: + remaining_frames.insert(0, frame) + remaining_samples += len(frame) + else: + break + + self.audio_frames = remaining_frames + + return False, None + + finally: + stream.stop() + stream.close() + self.monitoring = False + + async def cleanup_resources(self) -> None: + """清理资源""" + self.monitoring = False + # 清理可能残留的临时文件 + for file_path in self.cache_dir.glob("vad_check_*.opus"): + try: + file_path.unlink() + except Exception as e: + Logger.warning(f"清理临时文件失败 {file_path}: {e}") diff --git a/src/chatbot/chatter/voice_recorder.py b/src/chatbot/chatter/voice_recorder.py index fc55f84..2c935fc 100644 --- a/src/chatbot/chatter/voice_recorder.py +++ b/src/chatbot/chatter/voice_recorder.py @@ -1,3 +1,4 @@ +# type: ignore from __future__ import annotations import asyncio @@ -221,3 +222,195 @@ async def save_and_process_full_audio(self, full_audio: NDArray[np.float32]) -> return response.strip() Logger.warning("ASR识别返回空结果") return None + + # 在 VoiceRecorder 类中添加这个方法 + + async def record_with_voice_trigger(self) -> None | str: + """在检测到语音活动后开始录音""" + Logger.info("检测到语音活动,开始录音...") + + # 重置录音状态 + self.recording = True + self.audio_frames = [] + self.all_audio_frames = [] + self.segments_to_process = [] + self.audio_length = 0 + + # 计算每个片段的样本数 + samples_per_segment = int(RATE * SEGMENT_DURATION) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + start_time = time.time() + stream.start() + + # 立即开始VAD处理(不需要初始等待时间) + segment_processor = asyncio.create_task(self.process_segments()) + segment_index = 0 + segment_buffer: list[NDArray[np.float32]] = [] + + # 录音主循环 + while self.recording: + current_time = time.time() + self.audio_length = current_time - start_time + + # 检查最大录音时间 + if self.audio_length > MAX_RECORDING_TIME: + Logger.info(f"达到最大录音时间 ({MAX_RECORDING_TIME}s),停止录音") + self.recording = False + break + + # 收集当前帧 + current_frames = [] + if self.audio_frames: + current_frames = self.audio_frames.copy() + self.audio_frames = [] + self.all_audio_frames.extend(current_frames) + segment_buffer.extend(current_frames) + + # 处理音频片段 + total_samples = sum(len(frame) for frame in segment_buffer) + if total_samples >= samples_per_segment: + frames_for_segment: list[NDArray[np.float32]] = [] + segment_samples = 0 + + for i, frame in enumerate(segment_buffer): + frames_for_segment.append(frame) + segment_samples += len(frame) + if segment_samples >= samples_per_segment: + segment_buffer = segment_buffer[i + 1 :] + break + + if frames_for_segment: + segment_data = np.concatenate(frames_for_segment, axis=0) + self.segments_to_process.append((segment_data, segment_index)) + Logger.info(f"添加片段 {segment_index} 到处理队列") + segment_index += 1 + + await asyncio.sleep(0.1) + + # 停止录音 + Logger.info("停止录音...") + stream.stop() + stream.close() + + # 等待片段处理完成 + self.processing_segments = False + await segment_processor + + # 处理完整音频 + if self.all_audio_frames and len(self.all_audio_frames) > 0: + if self.audio_frames: + self.all_audio_frames.extend(self.audio_frames) + + full_audio = np.concatenate(self.all_audio_frames, axis=0) + full_duration = len(full_audio) / RATE + Logger.info(f"完整录音时长: {full_duration:.2f} 秒") + return await self.save_and_process_full_audio(full_audio) + + Logger.warning("没有收集到音频数据") + return None + + # 在 VoiceRecorder 类中添加这个方法 + + async def record_with_pre_collected_audio(self, pre_collected_audio: NDArray[np.float32]) -> None | str: + """使用预收集的音频开始录音""" + pre_duration = len(pre_collected_audio) / RATE + Logger.info(f"开始录音,包含预收集音频 {pre_duration:.2f} 秒") + + # 重置录音状态 + self.recording = True + self.audio_frames = [] + self.all_audio_frames = [] + self.segments_to_process = [] + self.audio_length = 0 + + # 将预收集的音频添加到总音频中 + # 需要将预收集的音频分割成帧格式 + frame_size = 1024 # 假设的帧大小,可以根据实际情况调整 + pre_frames = [] + for i in range(0, len(pre_collected_audio), frame_size): + end_idx = min(i + frame_size, len(pre_collected_audio)) + frame = pre_collected_audio[i:end_idx].reshape(-1, 1) # 确保是正确的形状 + pre_frames.append(frame) + + self.all_audio_frames.extend(pre_frames) + Logger.info(f"已添加预收集音频帧数: {len(pre_frames)}") + + # 计算每个片段的样本数 + samples_per_segment = int(RATE * SEGMENT_DURATION) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + start_time = time.time() + stream.start() + + # 立即开始VAD处理 + segment_processor = asyncio.create_task(self.process_segments()) + segment_index = 0 + segment_buffer: list[NDArray[np.float32]] = [] + + # 将预收集的音频也加入到分段处理中 + segment_buffer.extend(pre_frames) + + # 录音主循环 + while self.recording: + current_time = time.time() + self.audio_length = current_time - start_time + pre_duration # 包含预收集音频的时长 + + # 检查最大录音时间 + if self.audio_length > MAX_RECORDING_TIME: + Logger.info(f"达到最大录音时间 ({MAX_RECORDING_TIME}s),停止录音") + self.recording = False + break + + # 收集当前帧 + current_frames = [] + if self.audio_frames: + current_frames = self.audio_frames.copy() + self.audio_frames = [] + self.all_audio_frames.extend(current_frames) + segment_buffer.extend(current_frames) + + # 处理音频片段 + total_samples = sum(len(frame) for frame in segment_buffer) + if total_samples >= samples_per_segment: + frames_for_segment: list[NDArray[np.float32]] = [] + segment_samples = 0 + + for i, frame in enumerate(segment_buffer): + frames_for_segment.append(frame) + segment_samples += len(frame) + if segment_samples >= samples_per_segment: + segment_buffer = segment_buffer[i + 1 :] + break + + if frames_for_segment: + segment_data = np.concatenate(frames_for_segment, axis=0) + self.segments_to_process.append((segment_data, segment_index)) + Logger.info(f"添加片段 {segment_index} 到处理队列") + segment_index += 1 + + await asyncio.sleep(0.1) + + # 停止录音 + Logger.info("停止录音...") + stream.stop() + stream.close() + + # 等待片段处理完成 + self.processing_segments = False + await segment_processor + + # 处理完整音频 + if self.all_audio_frames and len(self.all_audio_frames) > 0: + if self.audio_frames: + self.all_audio_frames.extend(self.audio_frames) + + full_audio = np.concatenate(self.all_audio_frames, axis=0) + full_duration = len(full_audio) / RATE + Logger.info(f"完整录音时长(含预收集): {full_duration:.2f} 秒") + return await self.save_and_process_full_audio(full_audio) + + Logger.warning("没有收集到音频数据") + return None