From d77c4eda9eaba5e7472670351c09d93701005f96 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 00:15:54 +0800 Subject: [PATCH 1/7] double --- src/chatbot/chatter/__init__.py | 6 +++--- src/chatbot/chatter/util.py | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/chatbot/chatter/__init__.py b/src/chatbot/chatter/__init__.py index 934d2c3..f499f67 100644 --- a/src/chatbot/chatter/__init__.py +++ b/src/chatbot/chatter/__init__.py @@ -3,7 +3,7 @@ # 音频参数 RATE = 16000 # 采样率 CHANNELS = 1 # 单声道 -SILENCE_THRESHOLD = 1000 # 静音检测阈值(毫秒) +SILENCE_THRESHOLD = 600 # 静音检测阈值(毫秒) MAX_RECORDING_TIME = 30 # 最长录音时间(秒) -SEGMENT_DURATION = 1.5 # 每个VAD检测段的持续时间(秒) -INITIAL_WAIT_TIME = 3.0 # 初始等待时间(秒) +SEGMENT_DURATION = 0.8 # 每个VAD检测段的持续时间(秒) +INITIAL_WAIT_TIME = 1.5 # 初始等待时间(秒) diff --git a/src/chatbot/chatter/util.py b/src/chatbot/chatter/util.py index 3742b95..9971a6b 100644 --- a/src/chatbot/chatter/util.py +++ b/src/chatbot/chatter/util.py @@ -11,7 +11,11 @@ def handle_porcupine_keyword(system_platform: SystemPlatform): key_words = ["你好"] """获取 Porcupine 关键词和关键词特征文件路径""" if system_platform == "linux": - keyword_paths = ["./models/keywords_spotting/你好_linux.ppn"] + keyword_paths = [ + "./models/keywords_spotting/你好_linux.ppn", + "./models/keywords_spotting/派蒙_zh_linux_v3_0_0.ppn", + ] + key_words.extend(["派蒙"]) elif system_platform == "mac": keyword_paths = ["./models/keywords_spotting/你好_mac.ppn"] elif system_platform == "raspberry-pi": From 5f8dcdc1bf89adf8739380484d16bfba0ba879b1 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 02:24:46 +0800 Subject: [PATCH 2/7] update --- src/chatbot/__main__.py | 80 +++++++-- .../chatter/audio_processing_workflow.py | 4 +- src/chatbot/chatter/voice_activity_monitor.py | 162 ++++++++++++++++++ src/chatbot/chatter/voice_recorder.py | 88 ++++++++++ 4 files changed, 316 insertions(+), 18 deletions(-) create mode 100644 src/chatbot/chatter/voice_activity_monitor.py diff --git a/src/chatbot/__main__.py b/src/chatbot/__main__.py index 04085c3..25852d3 100644 --- a/src/chatbot/__main__.py +++ b/src/chatbot/__main__.py @@ -1,13 +1,12 @@ from __future__ import annotations - import asyncio import logging import signal from pathlib import Path - from chatbot.chatter.audio_processing_workflow import initialize_session_state, process_voice_command from chatbot.chatter.voice_recorder import VoiceRecorder from chatbot.chatter.wake_word_detector import WakeWordDetector +from chatbot.chatter.voice_activity_monitor import VoiceActivityMonitor # 新增 from chatbot.config_manager import ServiceSettings, load_settings_file from chatbot.console.logger import Logger, set_logger_debug @@ -18,8 +17,8 @@ def configure_logging() -> None: logging.getLogger(name) for name in logging.root.manager.loggerDict if name.startswith("streamlit") ] for logger in streamlit_loggers: - logger.setLevel(logging.ERROR) # 裸模式(非 streamlit run 交互式)运行会带来很多警告信息,这里将其设置为ERROR级别 - set_logger_debug() # 设置日志记录器为调试模式 + logger.setLevel(logging.ERROR) + set_logger_debug() class VoiceAssistant: @@ -29,33 +28,82 @@ def __init__(self): self.settings = load_settings_file("config.toml", ServiceSettings) self.cache_dir = Path(self.settings.cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) + self.wake_detector = WakeWordDetector( access_key=self.settings.access_key, system_platform=self.settings.system_platform ) self.voice_recorder = VoiceRecorder(cache_dir=self.cache_dir) + self.voice_monitor = VoiceActivityMonitor(cache_dir=self.cache_dir) # 新增 + + self.is_awake = False # 跟踪唤醒状态 + self.keep_alive_timeout = 10.0 # 保持唤醒的超时时间(秒) async def start(self) -> None: """启动程序:监听唤醒词并录音""" running = True while running: - # 监听唤醒词 - wakeup_success = await self.wake_detector.listen_for_wakeup() - if wakeup_success: - # 唤醒后开始录音 - asr_result = await self.voice_recorder.record_and_process() - if asr_result: - # 处理ASR结果 - await process_voice_command(asr_result, self.cache_dir) - Logger.info("录音已完成,重新进入唤醒词监听状态") - await asyncio.sleep(0.5) + if not self.is_awake: + # 未唤醒状态:监听唤醒词 + wakeup_success = await self.wake_detector.listen_for_wakeup() + if wakeup_success: + self.is_awake = True + Logger.info("已唤醒,进入对话模式") + # 首次唤醒后立即开始录音 + await self.handle_conversation() + else: + Logger.warning("唤醒词检测失败") + running = False else: - Logger.warning("唤醒词检测失败") - running = False + # 已唤醒状态:保持活跃并监听语音活动 + await self.keep_alive_and_listen() + await self.stop() + async def handle_conversation(self) -> None: + """处理一次完整的对话""" + # 如果是首次唤醒,使用原有的录音方法 + # 如果是语音活动触发,使用新的录音方法 + if hasattr(self, "_first_conversation") and not self._first_conversation: + asr_result = await self.voice_recorder.record_with_voice_trigger() + else: + asr_result = await self.voice_recorder.record_and_process() + self._first_conversation = False + + if asr_result: + await process_voice_command(asr_result, self.cache_dir) + Logger.info("对话处理完成") + else: + Logger.warning("未获取到有效的语音输入") + + async def keep_alive_and_listen(self) -> None: + """保持唤醒状态并监听语音活动""" + Logger.info(f"保持唤醒状态,监听语音活动(超时时间:{self.keep_alive_timeout}秒)") + + try: + # 监听语音活动,带超时 + voice_detected = await asyncio.wait_for( + self.voice_monitor.listen_for_voice_activity(), timeout=self.keep_alive_timeout + ) + + if voice_detected: + Logger.info("检测到语音活动,开始新的对话") + await self.handle_conversation() + # 对话完成后继续保持唤醒状态 + else: + Logger.info("未检测到语音活动") + self.is_awake = False + + except asyncio.TimeoutError: + Logger.info(f"{self.keep_alive_timeout}秒内未检测到语音活动,进入睡眠状态") + self.is_awake = False + except Exception as e: + Logger.error(f"语音活动监听出错: {e}") + self.is_awake = False + async def stop(self) -> None: """停止程序""" await self.wake_detector.cleanup_resources() + await self.voice_monitor.cleanup_resources() Logger.info("程序已停止") diff --git a/src/chatbot/chatter/audio_processing_workflow.py b/src/chatbot/chatter/audio_processing_workflow.py index f658841..86d24cf 100644 --- a/src/chatbot/chatter/audio_processing_workflow.py +++ b/src/chatbot/chatter/audio_processing_workflow.py @@ -72,7 +72,7 @@ async def process_voice_command(prompt: str, cache_dir: Path) -> None: # 创建任务 producer = asyncio.create_task(sentence_producer(prompt)) tts = asyncio.create_task(tts_worker(cache_dir / "tts")) - play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + # play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) # 等待生产者完成 await producer @@ -83,4 +83,4 @@ async def process_voice_command(prompt: str, cache_dir: Path) -> None: # 取消剩余任务 tts.cancel() - play.cancel() + # play.cancel() diff --git a/src/chatbot/chatter/voice_activity_monitor.py b/src/chatbot/chatter/voice_activity_monitor.py new file mode 100644 index 0000000..5f465c5 --- /dev/null +++ b/src/chatbot/chatter/voice_activity_monitor.py @@ -0,0 +1,162 @@ +from __future__ import annotations +import asyncio +import time +from typing import TYPE_CHECKING +import numpy as np +import sounddevice as sd +from chatbot.api.async_api import async_file_to_opus, async_get_vad_response +from chatbot.chatter import CHANNELS, RATE +from chatbot.console.logger import Logger +from chatbot.tools.timed_helper import get_time_tag_with_millis +import soundfile as sf + +if TYPE_CHECKING: + from pathlib import Path + from numpy.typing import NDArray + + +class VoiceActivityMonitor: + def __init__(self, cache_dir: Path): + self.cache_dir = cache_dir / "vad_monitor" + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.monitoring = False + self.audio_frames: list[NDArray[np.float32]] = [] + self.check_interval = 1.0 # 每秒检查一次 + self.min_audio_length = 0.5 # 最小音频长度(秒) + + def audio_callback( + self, indata: NDArray[np.float32], _frames: int, _time: dict[str, float], status: sd.CallbackFlags + ) -> None: + """声音设备的回调函数""" + if status: + Logger.warning(f"音频回调状态: {status}") + if self.monitoring: + self.audio_frames.append(indata.copy()) + + async def save_audio_segment(self, audio_data: NDArray[np.float32]) -> Path: + """保存音频片段为Opus文件""" + time_label = get_time_tag_with_millis() + temp_wav_path = self.cache_dir / f"vad_check_{time_label}.wav" + opus_path = self.cache_dir / f"vad_check_{time_label}.opus" + + # 保存为WAV + sf.write(temp_wav_path, audio_data, RATE, format="WAV", subtype="PCM_16") + + # 转换为Opus + await async_file_to_opus(temp_wav_path, opus_path) + + # 删除临时WAV文件 + temp_wav_path.unlink() + + return opus_path + + async def check_voice_activity(self, audio_segment: NDArray[np.float32]) -> bool: + """检查音频片段是否包含语音活动""" + if len(audio_segment) == 0: + return False + + try: + # 保存音频片段 + audio_path = await self.save_audio_segment(audio_segment) + + # 发送到VAD服务 + vad_result = await async_get_vad_response(audio_path) + + # 清理临时文件 + if audio_path.exists(): + audio_path.unlink() + + if not vad_result: + return False + + # 检查是否有语音活动 + timestamps = vad_result.get("timestamp", []) + has_voice = len(timestamps) > 0 and any(len(ts) > 0 for ts in timestamps) + + if has_voice: + Logger.info(f"检测到语音活动: {timestamps}") + + return has_voice + + except Exception as e: + Logger.error(f"VAD检查出错: {e}") + return False + + async def listen_for_voice_activity(self) -> bool: + """监听语音活动,返回是否检测到语音""" + self.monitoring = True + self.audio_frames = [] + + # 计算每次检查需要的样本数 + samples_per_check = int(RATE * self.check_interval) + min_samples = int(RATE * self.min_audio_length) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + stream.start() + + Logger.info("开始监听语音活动...") + + try: + while self.monitoring: + # 等待收集足够的音频数据 + await asyncio.sleep(self.check_interval) + + if not self.monitoring: + break + + # 检查是否有足够的音频数据 + total_samples = sum(len(frame) for frame in self.audio_frames) + + if total_samples >= min_samples: + # 提取音频片段进行检查 + frames_to_check = [] + samples_collected = 0 + + for frame in self.audio_frames: + frames_to_check.append(frame) + samples_collected += len(frame) + if samples_collected >= samples_per_check: + break + + if frames_to_check: + audio_segment = np.concatenate(frames_to_check, axis=0) + + # 检查语音活动 + has_voice = await self.check_voice_activity(audio_segment) + + if has_voice: + Logger.info("检测到语音活动!") + return True + + # 清理已检查的帧,保留一些重叠 + overlap_samples = int(samples_per_check * 0.2) # 20%重叠 + remaining_samples = 0 + remaining_frames = [] + + # 从后往前保留重叠部分 + for frame in reversed(self.audio_frames): + if remaining_samples + len(frame) <= overlap_samples: + remaining_frames.insert(0, frame) + remaining_samples += len(frame) + else: + break + + self.audio_frames = remaining_frames + + return False + + finally: + stream.stop() + stream.close() + self.monitoring = False + + async def cleanup_resources(self) -> None: + """清理资源""" + self.monitoring = False + # 清理可能残留的临时文件 + for file_path in self.cache_dir.glob("vad_check_*.opus"): + try: + file_path.unlink() + except Exception as e: + Logger.warning(f"清理临时文件失败 {file_path}: {e}") diff --git a/src/chatbot/chatter/voice_recorder.py b/src/chatbot/chatter/voice_recorder.py index fc55f84..0cb439a 100644 --- a/src/chatbot/chatter/voice_recorder.py +++ b/src/chatbot/chatter/voice_recorder.py @@ -221,3 +221,91 @@ async def save_and_process_full_audio(self, full_audio: NDArray[np.float32]) -> return response.strip() Logger.warning("ASR识别返回空结果") return None + + # 在 VoiceRecorder 类中添加这个方法 + + async def record_with_voice_trigger(self) -> None | str: + """在检测到语音活动后开始录音""" + Logger.info("检测到语音活动,开始录音...") + + # 重置录音状态 + self.recording = True + self.audio_frames = [] + self.all_audio_frames = [] + self.segments_to_process = [] + self.audio_length = 0 + + # 计算每个片段的样本数 + samples_per_segment = int(RATE * SEGMENT_DURATION) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + start_time = time.time() + stream.start() + + # 立即开始VAD处理(不需要初始等待时间) + segment_processor = asyncio.create_task(self.process_segments()) + segment_index = 0 + segment_buffer: list[NDArray[np.float32]] = [] + + # 录音主循环 + while self.recording: + current_time = time.time() + self.audio_length = current_time - start_time + + # 检查最大录音时间 + if self.audio_length > MAX_RECORDING_TIME: + Logger.info(f"达到最大录音时间 ({MAX_RECORDING_TIME}s),停止录音") + self.recording = False + break + + # 收集当前帧 + current_frames = [] + if self.audio_frames: + current_frames = self.audio_frames.copy() + self.audio_frames = [] + self.all_audio_frames.extend(current_frames) + segment_buffer.extend(current_frames) + + # 处理音频片段 + total_samples = sum(len(frame) for frame in segment_buffer) + if total_samples >= samples_per_segment: + frames_for_segment: list[NDArray[np.float32]] = [] + segment_samples = 0 + + for i, frame in enumerate(segment_buffer): + frames_for_segment.append(frame) + segment_samples += len(frame) + if segment_samples >= samples_per_segment: + segment_buffer = segment_buffer[i + 1 :] + break + + if frames_for_segment: + segment_data = np.concatenate(frames_for_segment, axis=0) + self.segments_to_process.append((segment_data, segment_index)) + Logger.info(f"添加片段 {segment_index} 到处理队列") + segment_index += 1 + + await asyncio.sleep(0.1) + + # 停止录音 + Logger.info("停止录音...") + stream.stop() + stream.close() + + # 等待片段处理完成 + self.processing_segments = False + await segment_processor + + # 处理完整音频 + if self.all_audio_frames and len(self.all_audio_frames) > 0: + if self.audio_frames: + self.all_audio_frames.extend(self.audio_frames) + + full_audio = np.concatenate(self.all_audio_frames, axis=0) + full_duration = len(full_audio) / RATE + Logger.info(f"完整录音时长: {full_duration:.2f} 秒") + return await self.save_and_process_full_audio(full_audio) + + Logger.warning("没有收集到音频数据") + return None From 7fd1b2a901e2c5145ba2be10af6fb329963ccee7 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 02:24:51 +0800 Subject: [PATCH 3/7] update --- ...346\264\276\350\222\231_zh_linux_v3_0_0.ppn" | Bin 0 -> 10268 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 "models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" diff --git "a/models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" "b/models/keywords_spotting/\346\264\276\350\222\231_zh_linux_v3_0_0.ppn" new file mode 100644 index 0000000000000000000000000000000000000000..03ad5fa99472a68b129836c2bf63ececa4dba4aa GIT binary patch literal 10268 zcmV+%DC5^W%){~EUhKpXql-6(6UFe8Q~SJ|AwNoI>25&jPdi@dP_Bi8lzC*WN2v^} z_L_wU&V=p9nI-Bk1i02o)bTmapK6_^ zuO6E~KfztsUAAg{&Id{L1rnp**4h%%QcN05Wu!zei2{PX5_zd6W;$ML&OyaDuY28WGV#1-9N5;a|B9hZ47nTATrF}Yn0n0LhUV+)tF zVNoLDo&%q9Qz4wIdH*`EGxYk9MYF~4kZgi(ko-uYl%D3rrEZf|(Xaqa$X z`O=5p@V-uy0ry$J_v6aa*I8A~gbW4j+Q1AbiyILaIHn-1a!!^mt?_phPeDgfQt&{LsB z$`4`gS-$>xniw)98i5|$xbtjmZ#&5gH$TYTK!fmgTtvT?P`&cXr+iZ%xU9qnfSDrm zz`o=Jtr$$!c{1e*qJ1Ss;ulK^0CS2G2|2jTt?Sc&exH;@I{rSoyr?&Pc4e20^zYe& zS*eq(tciof2&1wS8|Wfi#KsT*AXtgU1~bR)z(DIoXK+SeEY^6+RSEuHVZCuBTb zW*_b!etX{pPB8#RNM^j|9I8?ke}&;U^o@_{Qe=3y@KbL9<=Pv#&m zMJ|5x;G}&nbOp~|((+mY43++Rgb|?Q#@QyCAr3E!gxD3%r^GoB7Xyu8FDzjRHfOaA6;ugEHjzZ9 zS}=;VpA3j5my<}uK{jhxei4H>?|i0E+9JG4mPM7~-|P8dt&QSo5f#7i305sva`ITw zVmc;#R0;q?!^QnA9C*v@8}y}=JVRZ_>9a6m_V94$+P75em7H+}UVC7JU4?4P(7|PS zHS&#FeNbA(FOTtmY0NN_y|Pvpf4I#8!$zLVaQXwy?)B$2$%2_ZqpAGZB%UvrAZD=2 zjFG>T0~TfYqtr$4mk(0POy@i8CEQs4MY2Yc($AZ3Y|9Z^Z5y^gZ}OzW#)KS?gugcB zQO6zy#z@Y)Qd|p(&g4im0cQ97%kOakC2MUk}(^TO&&jB(K^Y*0->M{Ff^yZ{p9VB$U5SQV z)Q#LOE#!^CDH({oq2h)5*f%cPA~~H+#FPQ=j){Bz)Rf&S=y&blqdIKs6H6>mz2xkJH(+D_7 z(`#IMQWjx)R@>wm`n-XU>X_JA#P{KyC-Qki9e*lIIq{&0lL1hbz*`k*lLr`qn85VO zSe%OCUy)m}G2zp+&EFf?ci@2E4*Z&G=p6dODe51j^RIt7Y2#A=r4yI}d`oxVhs&Y$ z7Vh{CAu5XOzp;LPFmXx_!;Z(#q6SOyiXOh{D6g3)I6R2VWJ+SM!yObGJv}#lZQ-cU-@Tvb-}%36jy3|V z-x)a+@0+r{!Ml5(rjLzTTBas))9g_^Ky&SNP@cLg;(D!59Z)D6Jf_o@uPU>oN~KmChZ>^|O`k{QC%O>QIGY(35NGgU^Q9P1NoK2+pn%&_79!@1)={Kf1JI0a~q7;tEpMm9!JV^OmV z@G*8#b0sLNmuHj9X=)eGZu9fOc|;=0qP6{Ga)oEaQ^iOQ`JpbixRgD0j|ummpC|s- zk7T1qh%pF|hvt}89H(dgQU9&6G<_Wn<-og6iy8wcumVMIVT&yLefn`qN{AEUKajx@7E1oh6rcc3&T*i_T!ws>;#Zed=S-4p-l2nF*(M_h|^o2gCBjOrf&D)0n zVFK${fhJklwxODl$0N}(m#66IL|?Ulm~VQ<+;WyfX^d`tk>% zwYUc&?WMk2qjI;yx-|z};x)6F<$Ez`QuSXz2Yf4~j+0EsfS5~uo145!uK;bed)0&_JE95i(QheIe?8f%^I^#tHDExR`LN)DtMcu z-{wwZUj~Mo%`^v`MCFXj|IXv{vW!~B4DbeFg4UkpVz&glwe0Y}x+3e52d2oM&qAuS zuadm@nZ(#By=XMKpNlB4$OY*PEJ@qYu>wcVHrqQGP8ZmTgnW-%%ynW+6t6 z1kGdYO~}G!64N?$63y31H-Qa&ts1?6 ziHFj#8IqNWB%Y3@#qqbGVH(=n^`#^>o)!lIv_I-fYMF>w_c|WKpM4|}rgA90DwVd=*I#&Hq7~>gLdgvl;-~8Vrn>kY z%CQP4h@GY`+xLvb!rfF)h^&R;w0{03jkXc|Z~(WiRAq++%k9v5=jg@n#`NXy&|9hB zb{fX479np0rpk%2oEAA&>nZU0N4RZig#bD6qHwfh)3mKTp_xL_izS{gx^bY)3OF(4 zd?JKBixNt_&jS?+O{LUCKC-3Y^wDX*VixgXc*$Ay^AP3nrjF+K9$pE$rH+E29=&=( zL=l+}R?r62PrtsvotvEI1SXH34G6C$ZAve=30e|IvOQA_M5-g$xioiY*sxfNHHpQ$ z2(x*5Z-dFZ{s3JN4tV;U=NK^lCeO~1FQf}$EUOSWShHj^3Pp-{t`twstr*Df=y z$%`V_fIFootk}?3Ad-Lu^y#UPe$l&8-@)aI+3bk=W~oe+i=ZhKe;=uULxE$lnHYlR z+&JMW_;^vZwms1BB$D1+PXJf8hfF0Tq?`+lK$1u~qm4s0==_Ceqk`rmJ@+;Ydl6D8 za%*w-PP26EOuuAP*W-!-?O=J-I9w`+KVWGFB~>oU{Za<5>g?nS3Ne+YL9R?vS2!-> zRm3O_WTk%(R)D(=M07_r+}Y*yY-l_#Bnc6e6ffm!shHS2Z$w%v#{dxS_Ef*zxB_0G2OEpP&(<8Z&G+B^ z)%?V8Qj=s829GNG5-|y5=%>IqNV4|>X%Ckew z{TPIGceFDsXt3*0()cLD!A^y$Ub=sMhJxGDp60{s-yvRY?^~l79obT)5=5`EuSP~=ssw{-}@$AN{r{VEV%kZMcv4~T-`qD zNL!co`Kb*Q_At%tH7Shf(KPnG?o%F88K1T9zXYTxsI?k+f|%8>+1JqkKmk#Do10xk z)2K@l+6Pz1(diY2ZQEa+>G|6#(jpMEMp`MF%~j9Y+)$ne)^g0F)iws{Xt>9#Er_GE zC@Gslfem(zL0WpDUr^h)SuRcd2f92Yz}b2B zrjh!xD$al>!po2+OVvEoZ@Xm@nheyzHX^x;{i>uHdf(7@C)im1f;*`=8+k9TR;rfW z1LpUQByACAcXxocA%4e>FNmqNYqU2=jYED!3 zkTx1F;jP8J2GuB2C@vmf%NWjNq>t3sCZr+yXYg@0k4>Hgu%c$~o~X$m8GjY5tx$SCQi?t<1dxni}*!yZ?IyiE5w_2=EgIvLhb5Uf=5>^u+7Kqw}s zx49u;_WYon|CloAkrgfNZ+K)J^z{?ouG_p`PlgKf2l?^jXlFmCC;7Z zg#hOJ^2!?x-k#E}brV{OOx~;Ei6^8z=<#Tk!Fod+Oa%j*52rOUW&wB}i4J4Q9t1ED zABmw+g7#x~-Y5^&B*P?)S~)&CgFo-tpw2*DH*pyk76V4yU~N@U;edj3;d`$88aMQF zFNsGQJv959oOw_9d-E*{Wwu!K8&(ynh#U^qe8b|*J)y>SZo6+(Uyl^B;Ix%IGV=)(5M*r~?#rn3sIszHItH^+ zep#%%l4=cW{ZTbwkR-^B_sXSPmZ1}kqzQl7A1ScX{Jzh+=lufpH)Ht_q_=2($%8R9 zH(I1T{pSmc3ktjpb}keRs#nRunhG+NDuSKDNFZH3TLJLdC5*gn%CX!?VhMuhQ>PJt z*HHFbG#ksgS0;eI&MMF${zq-I9}imTS@kmfnpU6eNJ0>tu5c3L&Tav>dAFRm13)u0 ze_rNEZ+K--n>vz9QHEr!@YnjiYK)Fqae53!>`|q&C57^!;ha%2idD?2n$;+TpE@s6^DiN|&c$v!9DMt|OOxahcDbBz$)NgID zKCNJXo@8tJ>wgY;X;7$8yJFS}Hnd@+SSLWyCoFL%V?_aOci*u{Kz?BC7n=cjIpA>2 z-x?eL9qT(Ov5+G1$d7Jf+pP}_+R{3BJ1?C>J2EThcu#b9Y`fpA-=ktXI6Q+0)+{k0 zeh3#=ss|y84$y#6-Ydd;x@3$J{${daDR5L zP2BMo%f|uMQvnyr8__3q_SD5sv`(y%exD7{wolGvhu9y99PN>7W5p2 zbvi5y>#BVHao8u+*#8G!OJ~G-`F=2M1ZlPFDalw(2{@6Ei4#K%xbP*8krNBL>3Lt6*qoBP*5-Hn?^3|w{lZz#blG&_7HnG?z2oT-NbffPF}6+Z+1 zKK8728?3thf7yjbJ7rG7IVyC$Omvf}`(1zGVs&IEXC_IaU6BVA>a6?_ehltRur_W! zTCiNu{jat;8gd-BGsCcE&>rByRaX;cTiBiCcp=`|M8kk8J{4l9#k;7vl{Gd&om~vy z>JT>QI|~&w2Q@hnu-)R`Aa$N(Y@H~22CD8ILWXDjNIjnTEdEx~1y`G$@fdUkh zRLlejcudPkBXm75ja4)Q;pOrj@4@Hn&@{!UB0^(;(;;43Lro7&ykmfll<0j_fHn1H z2q037X_Z+FL)tJ0was*bGu@Ia&!s-Y&vW`E5Ez|njy}y*ivVCp?IWi7vk0~8DRCDAf+|UZ#LK}; zYq{Qq)hP1uI)J7X9t1`@P)eWqjXZKu+DZUT{Q9~QD0xtmBGLy~nr3)pnY@^oyF)1$`$GXQ1{{BBPRj|SF4@_CTL9?s^P1-_|D z^n=gu^3ZtDnp;3&%9lt#&FszcZ-#@_350bv=>Yf>cYNlsyoW|D*5=1rt(8_6_h5-B;Z!p$BKrRS!$Ib88EC7iS@zsmQuW{K+QcZzAHY-6f$8 z|L>?dV77bmTkE_+Tx^P6CN1Io&h@|jm2PT^UY5Jo7$Os|S}OysK)@CwFWC3_X&s< zzTZ6qLpwU6Is4o5IkkcYmC$=jE4BZvqX_h;uxSB7i$!c7O~6NznYZm!fGztpNfOCr z`ykBmV(Y)2V3Sr5i{<$*a)81YT@*m*uVH2{HGIw*cba` zcSRoAz#2-cnvW+EGp9!)ssz4w=1M9$-M?i(y3zQ@L~b#}hu3KHbo8Vr8>wSH8)= z{lVGHE_dC{k?`Cge>mGDIr_sV?La;GDmq2TU5sS zN>Hu9kFYjZiu~#z2B+_x){7HHb1F#`N2g!F0|}!QJ~B8C3c9RDe3@MQ_r8Mt84N+P z?n{q6J)BP0r+Cfmt@Wq}me;qz)LZ~|PN%gQ7Vqm-Fq_t5GnX}xG#Xp^_@(NAvljX> zlCX>}QY~T2!gq%0hk6G({`BI|2Q7)6Obi&Ga?*2!7dv?vSQ)gpGTIIX`b$cN49$Om9$u3;enfIe|Jp{Ia>K` z#v)LvdUwNUADX*i;ny!;_I2p9+ke&T`U6p=3^V+V8ko<6>wkcaF5avaJ{)hj*lvU3 z>#@-^r{D1RkEo^~=iNf%AWF*UiqEIdMvVZZ++~=LR`;EKr9s%>!Yia`mzu&Z>Xs3E zo{gQ9no?RyI_p?S2;~wl10z7rqH3C>r$nP)zQ=}CQcPVVtJQqGl>jLb*C!j zg!PHdJThav>9}mCeqi_;Hu`0`Kf=sQBE=;^sQkt|Lt10FCKwq>nr`96F3AENZ=%;@ z$dRhHl@(v$L{fW@6dkrIQ;#W|)WeS}`?dn*--4 z;rZ!Cnwnn}?)qQ65m$8-Al9_^%GnkVgf!qs)i8JL_&E!b8g70+aOWMty+SXB1%5U% zPrF_#5ZS&riWK?;&vc`!#aJx^w;qzKuAT&=uHj?tGlO*(7;;JjyWVVZfD?%0VWKhM zs28z!==Wd<4-r79qKH>#vB(rD;8bHFpmEQ-0BN zXf+n~!FmG0qHg;~xkp#AedNtnBOoUq2LkvdAVB_Cv#MwPP-f8yq)5jGdb^315I2$5 zO$%3!NM&yDc8I+a7lMMG88rLV$@Hh%eGIH1?uQ6}6(o=2{Tu$Sg8-^c)_jVgeqjd6 z-j>_F?F2ggoVARIvQg{1^`@O}$X~6Yc$CFZ`hD$r(%F#EHb24(ow{|@D8fM32OU1Q zC`dt1uB58Ygn8yGT{}^? zK%ii%;5vcb{(95UTlsHEt4V^<&qMK*wFQ$)wZazJ+z|e86|XQAYS1! z#1W9w9ht!MO@RH246{oaBTHH~KJ+72KfkIU287F((rQ9h%P}7?Zdg9-qnq8p?8nb) z1SjCqP1{d1wNL(srzOQSX$G`o8U6}lq_eE zNCG}o%zX+$<@#ROFseat^Qpg_>ngf{9la&hwL8;?u=4oJ@b+ut5qP4M@HbT~Go_(58XJf-VA9vZw>#uOE3@8s`mdY`NWrBmwP!ZvC z<;G2dr}SCdjiHfWCM`^#oHF^B+qD~v>jvt<+UY9>p|P#`1gVB4E8X_09v=+}`CzW1 zN2q3(6CJ%C)mHmvgeZbH5c+4#c=6V?T4NMyLN>rsv1Vv(->8BhDnScc(%!;RhpqY{ z@l3V=WQYrMC>m4!i)rl3JgY(ZFJ@FHEjDNR5t+25ALO!s#!m6pDX{Ry!qr3C?&|>KTR-V7gKOaj(*R(BZOTO3;b^Zh zxe=ibM3uJSxm(8|@sw}>0!kH_Lv?(&XX4xe3^b*btaqj5mdAZD1%0VSf>wGDE=;v- z-$NK6S=yfb*bOmrfK>w|uvEOP)D4L<Ks5 z$WJEqfV$vmB&7qi(#guc* zQeWzPfNkZwGeOQ`3$!~+W-Oj!kdY4oevV_v@mELN-WVKBqxG`-h!Fq2npcl*yUa*R zsh1@XIaTFZQZOEwdgJ6#LX=lzTRhU-T4+f$U(bDa_ACz9N3R;r(73d`Z9bL1 zA-$hb*o!q|0QbcOB+jwvV5cvW--*p@- zIp9&Ui#8U*Zo7~-oU(=Dg7K3xI%KuLAbWs;hpG3?2$Nf}N?1Gz%tk~)%(wKEfFkZ` z6l>v-+~4U%K9-Qpt+j0%{@ z?Ug%?>nHH3=$A16y}-U>5fvx6LI`8bTiEuwRqEH)L~q*(=3bV~ERFkoA};sx&@WHW z0$lK;tU>}^5nS}YcBHQmBnYs*Kk7KHC+^5$_9)F!cX0hrKK+oH8HyoFd+|J+2$~Fc zgb}@Av=@T34+kSc)f5EFMX-vpg7?d+I=`KHfikEZ#WYghF`4DF7g#Z6%)}x(u32Mz zG0S=jD{-&XbPh@3!zW!C2J)BEcKLhi3&b_o%7I1d&|A zOXyn@5Co?OG=Y1{gu2(m=5f|~Fr){}NgI*Nut!32e;Ww0op(oLL!D${amqjxl9z{@ zJ>JKZnBPhLEyVy3FNiKUMs=n--4B4nPRV$8X>>tefmLd`hiA=(8w>Lt*3<%d>Na%t ztTev2g~Y}J9rzycpVN(K*oS522cg2Q=j9~cX}Cvq-6R|j2W{+=1p@?iRLtn9>z{p& z1HA$(fGvIIqy5VsBcnrq-CtY#*~7x~dO-+^evIL0KKVo zkG9=-fzj<%aU8BDx>Ilp~#0nU}-7DOW%Fu#k4l%suoVCTBmXBoe( zfvYPdxcV#+yGX73qBYX?KX>dMF4cpzbpv*_m0||cY5Y}QZXGi>>{g7B*p(gD^g+u@ z@JA#th4N8qymqQu&8iM5qGLnQ7n+i=-C#O&q@n$$3{*%j{2&rmZy%o!W`Dk7dZW1M zdH2;K`L zF@&UXxF!FIUI0DF1HVU*Irfgu5VU6JmuK2gq{GQx$es9dh7JZWv%!&$qR~>;{7ijS5zhnglaBzq1wOCBwbnDr-b60`=BE=-LxH=ar5(t5~8)@Yg2%}Iw?dV z-9Pj3dXaJ`Q!sv{#g@pkY Date: Fri, 13 Jun 2025 06:53:31 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E8=BF=9E=E7=BB=AD=E5=AF=B9=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chatbot/__main__.py | 51 +++++---- .../chatter/audio_processing_workflow.py | 4 +- src/chatbot/chatter/voice_activity_monitor.py | 24 +++- src/chatbot/chatter/voice_recorder.py | 104 ++++++++++++++++++ 4 files changed, 155 insertions(+), 28 deletions(-) diff --git a/src/chatbot/__main__.py b/src/chatbot/__main__.py index 25852d3..95ec497 100644 --- a/src/chatbot/__main__.py +++ b/src/chatbot/__main__.py @@ -1,12 +1,14 @@ from __future__ import annotations + import asyncio import logging import signal from pathlib import Path + from chatbot.chatter.audio_processing_workflow import initialize_session_state, process_voice_command +from chatbot.chatter.voice_activity_monitor import VoiceActivityMonitor # 新增 from chatbot.chatter.voice_recorder import VoiceRecorder from chatbot.chatter.wake_word_detector import WakeWordDetector -from chatbot.chatter.voice_activity_monitor import VoiceActivityMonitor # 新增 from chatbot.config_manager import ServiceSettings, load_settings_file from chatbot.console.logger import Logger, set_logger_debug @@ -59,36 +61,25 @@ async def start(self) -> None: await self.stop() - async def handle_conversation(self) -> None: - """处理一次完整的对话""" - # 如果是首次唤醒,使用原有的录音方法 - # 如果是语音活动触发,使用新的录音方法 - if hasattr(self, "_first_conversation") and not self._first_conversation: - asr_result = await self.voice_recorder.record_with_voice_trigger() - else: - asr_result = await self.voice_recorder.record_and_process() - self._first_conversation = False - - if asr_result: - await process_voice_command(asr_result, self.cache_dir) - Logger.info("对话处理完成") - else: - Logger.warning("未获取到有效的语音输入") - async def keep_alive_and_listen(self) -> None: """保持唤醒状态并监听语音活动""" Logger.info(f"保持唤醒状态,监听语音活动(超时时间:{self.keep_alive_timeout}秒)") try: - # 监听语音活动,带超时 - voice_detected = await asyncio.wait_for( + # 监听语音活动,带超时,现在返回音频数据 + result = await asyncio.wait_for( self.voice_monitor.listen_for_voice_activity(), timeout=self.keep_alive_timeout ) + voice_detected, pre_collected_audio = result + if voice_detected: Logger.info("检测到语音活动,开始新的对话") - await self.handle_conversation() - # 对话完成后继续保持唤醒状态 + # 将预收集的音频传递给录音器 + if pre_collected_audio is not None: + await self.handle_conversation_with_pre_audio(pre_collected_audio) + else: + await self.handle_conversation() else: Logger.info("未检测到语音活动") self.is_awake = False @@ -100,6 +91,24 @@ async def keep_alive_and_listen(self) -> None: Logger.error(f"语音活动监听出错: {e}") self.is_awake = False + async def handle_conversation_with_pre_audio(self, pre_collected_audio: NDArray[np.float32]) -> None: + """处理带有预收集音频的对话""" + asr_result = await self.voice_recorder.record_with_pre_collected_audio(pre_collected_audio) + if asr_result: + await process_voice_command(asr_result, self.cache_dir) + Logger.info("对话处理完成") + else: + Logger.warning("未获取到有效的语音输入") + + async def handle_conversation(self) -> None: + """处理一次完整的对话""" + asr_result = await self.voice_recorder.record_and_process() + if asr_result: + await process_voice_command(asr_result, self.cache_dir) + Logger.info("对话处理完成") + else: + Logger.warning("未获取到有效的语音输入") + async def stop(self) -> None: """停止程序""" await self.wake_detector.cleanup_resources() diff --git a/src/chatbot/chatter/audio_processing_workflow.py b/src/chatbot/chatter/audio_processing_workflow.py index 86d24cf..f658841 100644 --- a/src/chatbot/chatter/audio_processing_workflow.py +++ b/src/chatbot/chatter/audio_processing_workflow.py @@ -72,7 +72,7 @@ async def process_voice_command(prompt: str, cache_dir: Path) -> None: # 创建任务 producer = asyncio.create_task(sentence_producer(prompt)) tts = asyncio.create_task(tts_worker(cache_dir / "tts")) - # play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) # 等待生产者完成 await producer @@ -83,4 +83,4 @@ async def process_voice_command(prompt: str, cache_dir: Path) -> None: # 取消剩余任务 tts.cancel() - # play.cancel() + play.cancel() diff --git a/src/chatbot/chatter/voice_activity_monitor.py b/src/chatbot/chatter/voice_activity_monitor.py index 5f465c5..45a5447 100644 --- a/src/chatbot/chatter/voice_activity_monitor.py +++ b/src/chatbot/chatter/voice_activity_monitor.py @@ -1,17 +1,21 @@ from __future__ import annotations + import asyncio import time from typing import TYPE_CHECKING + import numpy as np import sounddevice as sd +import soundfile as sf + from chatbot.api.async_api import async_file_to_opus, async_get_vad_response from chatbot.chatter import CHANNELS, RATE from chatbot.console.logger import Logger from chatbot.tools.timed_helper import get_time_tag_with_millis -import soundfile as sf if TYPE_CHECKING: from pathlib import Path + from numpy.typing import NDArray @@ -21,6 +25,7 @@ def __init__(self, cache_dir: Path): self.cache_dir.mkdir(parents=True, exist_ok=True) self.monitoring = False self.audio_frames: list[NDArray[np.float32]] = [] + self.all_collected_frames: list[NDArray[np.float32]] = [] # 保存所有收集的音频 self.check_interval = 1.0 # 每秒检查一次 self.min_audio_length = 0.5 # 最小音频长度(秒) @@ -32,6 +37,7 @@ def audio_callback( Logger.warning(f"音频回调状态: {status}") if self.monitoring: self.audio_frames.append(indata.copy()) + self.all_collected_frames.append(indata.copy()) # 同时保存到总集合中 async def save_audio_segment(self, audio_data: NDArray[np.float32]) -> Path: """保存音频片段为Opus文件""" @@ -82,10 +88,11 @@ async def check_voice_activity(self, audio_segment: NDArray[np.float32]) -> bool Logger.error(f"VAD检查出错: {e}") return False - async def listen_for_voice_activity(self) -> bool: - """监听语音活动,返回是否检测到语音""" + async def listen_for_voice_activity(self) -> tuple[bool, NDArray[np.float32] | None]: + """监听语音活动,返回是否检测到语音以及收集的音频数据""" self.monitoring = True self.audio_frames = [] + self.all_collected_frames = [] # 重置总音频收集 # 计算每次检查需要的样本数 samples_per_check = int(RATE * self.check_interval) @@ -127,7 +134,14 @@ async def listen_for_voice_activity(self) -> bool: if has_voice: Logger.info("检测到语音活动!") - return True + # 返回检测到语音活动以及到目前为止收集的所有音频 + if self.all_collected_frames: + collected_audio = np.concatenate(self.all_collected_frames, axis=0) + duration = len(collected_audio) / RATE + Logger.info(f"返回已收集的音频数据,时长: {duration:.2f} 秒") + return True, collected_audio + else: + return True, None # 清理已检查的帧,保留一些重叠 overlap_samples = int(samples_per_check * 0.2) # 20%重叠 @@ -144,7 +158,7 @@ async def listen_for_voice_activity(self) -> bool: self.audio_frames = remaining_frames - return False + return False, None finally: stream.stop() diff --git a/src/chatbot/chatter/voice_recorder.py b/src/chatbot/chatter/voice_recorder.py index 0cb439a..b5537da 100644 --- a/src/chatbot/chatter/voice_recorder.py +++ b/src/chatbot/chatter/voice_recorder.py @@ -309,3 +309,107 @@ async def record_with_voice_trigger(self) -> None | str: Logger.warning("没有收集到音频数据") return None + + # 在 VoiceRecorder 类中添加这个方法 + + async def record_with_pre_collected_audio(self, pre_collected_audio: NDArray[np.float32]) -> None | str: + """使用预收集的音频开始录音""" + pre_duration = len(pre_collected_audio) / RATE + Logger.info(f"开始录音,包含预收集音频 {pre_duration:.2f} 秒") + + # 重置录音状态 + self.recording = True + self.audio_frames = [] + self.all_audio_frames = [] + self.segments_to_process = [] + self.audio_length = 0 + + # 将预收集的音频添加到总音频中 + # 需要将预收集的音频分割成帧格式 + frame_size = 1024 # 假设的帧大小,可以根据实际情况调整 + pre_frames = [] + for i in range(0, len(pre_collected_audio), frame_size): + end_idx = min(i + frame_size, len(pre_collected_audio)) + frame = pre_collected_audio[i:end_idx].reshape(-1, 1) # 确保是正确的形状 + pre_frames.append(frame) + + self.all_audio_frames.extend(pre_frames) + Logger.info(f"已添加预收集音频帧数: {len(pre_frames)}") + + # 计算每个片段的样本数 + samples_per_segment = int(RATE * SEGMENT_DURATION) + + # 启动录音流 + stream = sd.InputStream(samplerate=RATE, channels=CHANNELS, callback=self.audio_callback) + start_time = time.time() + stream.start() + + # 立即开始VAD处理 + segment_processor = asyncio.create_task(self.process_segments()) + segment_index = 0 + segment_buffer: list[NDArray[np.float32]] = [] + + # 将预收集的音频也加入到分段处理中 + segment_buffer.extend(pre_frames) + + # 录音主循环 + while self.recording: + current_time = time.time() + self.audio_length = current_time - start_time + pre_duration # 包含预收集音频的时长 + + # 检查最大录音时间 + if self.audio_length > MAX_RECORDING_TIME: + Logger.info(f"达到最大录音时间 ({MAX_RECORDING_TIME}s),停止录音") + self.recording = False + break + + # 收集当前帧 + current_frames = [] + if self.audio_frames: + current_frames = self.audio_frames.copy() + self.audio_frames = [] + self.all_audio_frames.extend(current_frames) + segment_buffer.extend(current_frames) + + # 处理音频片段 + total_samples = sum(len(frame) for frame in segment_buffer) + if total_samples >= samples_per_segment: + frames_for_segment: list[NDArray[np.float32]] = [] + segment_samples = 0 + + for i, frame in enumerate(segment_buffer): + frames_for_segment.append(frame) + segment_samples += len(frame) + if segment_samples >= samples_per_segment: + segment_buffer = segment_buffer[i + 1 :] + break + + if frames_for_segment: + segment_data = np.concatenate(frames_for_segment, axis=0) + self.segments_to_process.append((segment_data, segment_index)) + Logger.info(f"添加片段 {segment_index} 到处理队列") + segment_index += 1 + + await asyncio.sleep(0.1) + + # 停止录音 + Logger.info("停止录音...") + stream.stop() + stream.close() + + # 等待片段处理完成 + self.processing_segments = False + await segment_processor + + # 处理完整音频 + if self.all_audio_frames and len(self.all_audio_frames) > 0: + if self.audio_frames: + self.all_audio_frames.extend(self.audio_frames) + + full_audio = np.concatenate(self.all_audio_frames, axis=0) + full_duration = len(full_audio) / RATE + Logger.info(f"完整录音时长(含预收集): {full_duration:.2f} 秒") + return await self.save_and_process_full_audio(full_audio) + + Logger.warning("没有收集到音频数据") + return None From ac31d02358cae97165f1a4dd0bfc00dd0c021c91 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 07:33:04 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chatbot/api/async_api.py | 271 ++++++++++++++---- .../chatter/audio_processing_workflow.py | 41 ++- 2 files changed, 249 insertions(+), 63 deletions(-) diff --git a/src/chatbot/api/async_api.py b/src/chatbot/api/async_api.py index d8602e8..6337ad7 100644 --- a/src/chatbot/api/async_api.py +++ b/src/chatbot/api/async_api.py @@ -14,7 +14,7 @@ from chatbot._dictionary import session_keys from chatbot.config_manager import ServiceSettings, load_settings_file -from chatbot.console.logger import Logger +from chatbot.console.logger import Badge, Logger from chatbot.tools.audio import file_to_opus, file_to_wav, play_opus_file if TYPE_CHECKING: @@ -47,75 +47,244 @@ async def get_openai_response_stream( stop: list[str] | None = None, presence_penalty: float = 0, frequency_penalty: float = 0, + max_retries: int = 3, + first_sentence_timeout: float = 3.5, + request_timeout: float = 30.0, ): """ 获取OpenAI API的响应(流式,异步) + 修复重试时用户消息丢失的问题 """ settings: ServiceSettings = load_settings_file("config.toml", ServiceSettings) OPENAI_API_KEY: str = settings.sdk_key OPENAI_ENDPOINT: str = settings.sdk_base_url + "/v1/chat/completions" + headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } + + # 准备消息 - 只在第一次添加 if len(st.session_state[session_keys["short_term_memory"]]) == 0: - # 如果短期记忆为空,添加系统提示 st.session_state[session_keys["short_term_memory"]].append({"role": "system", "content": SYSTEMPROMOT}) - st.session_state[session_keys["short_term_memory"]].append({"role": "user", "content": prompt}) - data = { - "model": model, - "messages": st.session_state[session_keys["short_term_memory"]], - "max_tokens": max_tokens, - "temperature": temperature, - "n": n, - "stop": stop, - "presence_penalty": presence_penalty, - "frequency_penalty": frequency_penalty, - "stream": True, - } - buffer = "" - t_start = time.monotonic() # 发起请求前的时间 - first_sentence_time = None - async with aiohttp.ClientSession() as session: - async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: - async for line in resp.content: - decoded = line.decode("utf-8").strip() - if not decoded or not decoded.startswith("data: "): - continue - data_str = decoded[6:] - if data_str.strip() == "[DONE]": - if buffer: - yield buffer - break - try: - chunk = json.loads(data_str) - if "choices" in chunk and chunk["choices"]: - content = chunk["choices"][0]["delta"].get("content", "") - buffer += content - while True: - m = re.search(r"[。!?!?\.]", buffer) - if m: - sentence = buffer[: m.end()].strip().replace("\n", "") - # 第一次分句,记录耗时 - if first_sentence_time is None: - first_sentence_time = time.monotonic() - Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒") - st.session_state[session_keys["text_response"]] += sentence - yield sentence - buffer = buffer[m.end() :] - else: + # 保存原始的用户消息,用于重试 + user_message = {"role": "user", "content": prompt} + + # 重试循环 + for attempt in range(max_retries): + Logger.info(f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)") + + try: + # 重置响应文本 + st.session_state[session_keys["text_response"]] = "" + first_sentence_received = False + + # 确保用户消息在短期记忆中(每次重试都重新添加) + # 先检查最后一条是否是相同的用户消息,如果不是则添加 + if ( + not st.session_state[session_keys["short_term_memory"]] + or st.session_state[session_keys["short_term_memory"]][-1] != user_message + ): + st.session_state[session_keys["short_term_memory"]].append(user_message) + + # 准备请求数据 + data = { + "model": model, + "messages": st.session_state[session_keys["short_term_memory"]], + "max_tokens": max_tokens, + "temperature": temperature, + "n": n, + "stop": stop, + "presence_penalty": presence_penalty, + "frequency_penalty": frequency_penalty, + "stream": True, + } + + # 添加调试日志,显示发送的消息 + Logger.info(f"发送的消息: {data['messages']}") + + # 使用统一的超时时间控制整个首句获取过程 + async def get_first_sentence_with_timeout(): + nonlocal first_sentence_received + + buffer = "" + t_start = time.monotonic() + first_sentence_time = None + + # 设置超时 + timeout = aiohttp.ClientTimeout( + total=request_timeout, connect=first_sentence_timeout - 0.5, sock_read=request_timeout + ) + + async with aiohttp.ClientSession(timeout=timeout) as session: + Logger.debug("正在建立连接...") + + async with session.post(OPENAI_ENDPOINT, headers=headers, json=data) as resp: + Logger.debug(f"连接已建立,状态码: {resp.status}") + + if resp.status != 200: + error_text = await resp.text() + raise Exception(f"HTTP {resp.status}: {error_text}") + + Logger.debug("开始接收响应流...") + + async for line in resp.content: + decoded = line.decode("utf-8").strip() + if not decoded or not decoded.startswith("data: "): + continue + + data_str = decoded[6:] + if data_str.strip() == "[DONE]": + if buffer.strip() and not first_sentence_received: + Logger.debug(f"流结束,返回剩余内容: {repr(buffer)}") + return buffer.strip(), buffer, resp + break + + try: + chunk = json.loads(data_str) + if "choices" in chunk and chunk["choices"]: + content = chunk["choices"][0]["delta"].get("content", "") + if content: + Logger.debug(f"收到内容块: {repr(content)}") + buffer += content + + # 检查是否形成完整句子 + m = re.search(r"[。!?!?\.]", buffer) + if m: + sentence = buffer[: m.end()].strip().replace("\n", "") + if sentence: + first_sentence_time = time.monotonic() + first_sentence_received = True + Logger.debug(f"首句耗时: {first_sentence_time - t_start:.3f} 秒") + Logger.debug(f"生成首句: {repr(sentence)}") + return sentence, buffer[m.end() :], resp + except json.JSONDecodeError as e: + Logger.warning(f"JSON解析错误: {e}") + continue + except Exception as e: + Logger.error(f"解析响应块时出错: {e}") + continue + + # 如果没有找到完整句子但有内容 + if buffer.strip(): + Logger.debug(f"未找到完整句子,返回现有内容: {repr(buffer)}") + return buffer.strip(), "", resp + + return None, "", resp + + # 使用统一的超时时间 + Logger.debug(f"等待首句响应(总超时时间: {first_sentence_timeout}秒)...") + + try: + result = await asyncio.wait_for(get_first_sentence_with_timeout(), timeout=first_sentence_timeout) + + first_sentence, remaining_buffer, resp = result + + if first_sentence: + Logger.info(f"首句接收成功: {repr(first_sentence)}") + st.session_state[session_keys["text_response"]] += first_sentence + yield first_sentence + + # 首句成功后,处理剩余内容 + Logger.debug("首句成功,继续处理后续内容...") + + try: + buffer = remaining_buffer + + # 继续处理剩余的响应流 + async for line in resp.content: + decoded = line.decode("utf-8").strip() + if not decoded or not decoded.startswith("data: "): + continue + + data_str = decoded[6:] + if data_str.strip() == "[DONE]": + if buffer.strip(): + Logger.debug(f"处理剩余内容: {repr(buffer)}") + st.session_state[session_keys["text_response"]] += buffer.strip() + yield buffer.strip() break - except Exception as e: - Logger.error(f"{e}") - # 可选,总耗时打印 - t_end = time.monotonic() - Logger.debug(f"openai 总耗时: {t_end - t_start:.3f} 秒") + try: + chunk = json.loads(data_str) + if "choices" in chunk and chunk["choices"]: + content = chunk["choices"][0]["delta"].get("content", "") + if content: + buffer += content + + # 检查是否形成完整句子 + while True: + m = re.search(r"[。!?!?\.]", buffer) + if m: + sentence = buffer[: m.end()].strip().replace("\n", "") + if sentence: + st.session_state[session_keys["text_response"]] += sentence + Logger.debug(f"后续句子: {repr(sentence)}") + yield sentence + buffer = buffer[m.end() :] + else: + break + except json.JSONDecodeError as e: + Logger.warning(f"JSON解析错误: {e}") + continue + except Exception as e: + Logger.error(f"解析后续响应时出错: {e}") + continue + + except Exception as e: + Logger.warning(f"处理后续内容时出错: {e}") + + # 成功完成 + t_end = time.monotonic() + Logger.debug(f"openai 总耗时: {t_end - time.monotonic():.3f} 秒") + + st.session_state[session_keys["short_term_memory"]].append( + {"role": "assistant", "content": st.session_state[session_keys["text_response"]]} + ) + Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}") + + Logger.info("OpenAI请求成功完成") + return + + else: + Logger.warning("未收到有效的首句内容") + + except asyncio.TimeoutError: + Logger.warning(f"首句获取总超时({first_sentence_timeout}秒)") + + except Exception as e: + Logger.error(f"第 {attempt + 1} 次请求出现异常: {e}") + import traceback + + Logger.debug(f"异常详情: {traceback.format_exc()}") + + # 重试逻辑 + if not first_sentence_received and attempt < max_retries - 1: + # 从短期记忆中移除当前的用户消息,下次循环会重新添加 + if ( + st.session_state[session_keys["short_term_memory"]] + and st.session_state[session_keys["short_term_memory"]][-1]["role"] == "user" + ): + st.session_state[session_keys["short_term_memory"]].pop() + + wait_time = 0.5 + attempt * 0.5 + Logger.info(f"等待 {wait_time} 秒后重试...") + await asyncio.sleep(wait_time) + continue + elif first_sentence_received: + Logger.info("首句已成功接收") + return + else: + break + + # 所有重试失败 + Logger.error("所有重试都失败了,返回默认响应") + st.session_state[session_keys["text_response"]] = "抱歉,我现在无法正常回应,请稍后再试。" st.session_state[session_keys["short_term_memory"]].append( {"role": "assistant", "content": st.session_state[session_keys["text_response"]]} ) - Logger.debug(f"短期记忆:{st.session_state[session_keys['short_term_memory']]}") + yield "抱歉,我现在无法正常回应,请稍后再试。" async def async_get_tts_response(text: str): diff --git a/src/chatbot/chatter/audio_processing_workflow.py b/src/chatbot/chatter/audio_processing_workflow.py index f658841..69bfaab 100644 --- a/src/chatbot/chatter/audio_processing_workflow.py +++ b/src/chatbot/chatter/audio_processing_workflow.py @@ -66,21 +66,38 @@ async def play_worker(cache_dir: Path) -> None: async def process_voice_command(prompt: str, cache_dir: Path) -> None: """处理语音命令的完整流程""" + Logger.info(f"开始处理语音命令: {prompt}") # 添加这行日志 + # 确保缓存目录存在 (cache_dir / "tts").mkdir(parents=True, exist_ok=True) - # 创建任务 - producer = asyncio.create_task(sentence_producer(prompt)) - tts = asyncio.create_task(tts_worker(cache_dir / "tts")) - play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + try: + # 创建任务 + producer = asyncio.create_task(sentence_producer(prompt)) + tts = asyncio.create_task(tts_worker(cache_dir / "tts")) + play = asyncio.create_task(play_worker(cache_dir=cache_dir / "tts")) + + Logger.info("已创建处理任务,等待生产者完成...") # 添加这行日志 + + # 等待生产者完成 + await producer + + Logger.info("生产者完成,等待队列处理...") # 添加这行日志 + + # 等待所有队列处理完毕 + await st.session_state[session_keys["sentence_que"]].join() + await st.session_state[session_keys["tts_que"]].join() + + Logger.info("队列处理完成,取消剩余任务...") # 添加这行日志 + + # 取消剩余任务 + tts.cancel() + play.cancel() - # 等待生产者完成 - await producer + Logger.info("语音命令处理完成") # 添加这行日志 - # 等待所有队列处理完毕 - await st.session_state[session_keys["sentence_que"]].join() - await st.session_state[session_keys["tts_que"]].join() + except Exception as e: + Logger.error(f"处理语音命令时出错: {e}") + import traceback - # 取消剩余任务 - tts.cancel() - play.cancel() + Logger.error(f"错误详情: {traceback.format_exc()}") From 75d6d6a7cc64085bf59c94607d6803076f0bc012 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 07:39:03 +0800 Subject: [PATCH 6/7] =?UTF-8?q?solve=20retry=20=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chatbot/api/async_api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/chatbot/api/async_api.py b/src/chatbot/api/async_api.py index 6337ad7..1e9b81d 100644 --- a/src/chatbot/api/async_api.py +++ b/src/chatbot/api/async_api.py @@ -64,7 +64,6 @@ async def get_openai_response_stream( "Content-Type": "application/json", } - # 准备消息 - 只在第一次添加 if len(st.session_state[session_keys["short_term_memory"]]) == 0: st.session_state[session_keys["short_term_memory"]].append({"role": "system", "content": SYSTEMPROMOT}) @@ -73,7 +72,9 @@ async def get_openai_response_stream( # 重试循环 for attempt in range(max_retries): - Logger.info(f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)") + Logger.custom( + f"发送OpenAI请求 (第 {attempt + 1}/{max_retries} 次)", badge=Badge("连接", fore="black", back="cyan") + ) try: # 重置响应文本 From f0650f9c7795cbfbfbca25549c8d7ee7c28f0f59 Mon Sep 17 00:00:00 2001 From: MrXnneHang Date: Fri, 13 Jun 2025 09:29:15 +0800 Subject: [PATCH 7/7] ignore lint. --- src/chatbot/__main__.py | 8 +++++++- src/chatbot/api/async_api.py | 8 +++++--- src/chatbot/chatter/voice_activity_monitor.py | 2 +- src/chatbot/chatter/voice_recorder.py | 1 + 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/chatbot/__main__.py b/src/chatbot/__main__.py index 95ec497..63fc482 100644 --- a/src/chatbot/__main__.py +++ b/src/chatbot/__main__.py @@ -1,9 +1,11 @@ +# type: ignore from __future__ import annotations import asyncio import logging import signal from pathlib import Path +from typing import TYPE_CHECKING from chatbot.chatter.audio_processing_workflow import initialize_session_state, process_voice_command from chatbot.chatter.voice_activity_monitor import VoiceActivityMonitor # 新增 @@ -12,6 +14,10 @@ from chatbot.config_manager import ServiceSettings, load_settings_file from chatbot.console.logger import Logger, set_logger_debug +if TYPE_CHECKING: + import numpy as np + from numpy.typing import NDArray + # 配置日志 def configure_logging() -> None: @@ -84,7 +90,7 @@ async def keep_alive_and_listen(self) -> None: Logger.info("未检测到语音活动") self.is_awake = False - except asyncio.TimeoutError: + except TimeoutError: Logger.info(f"{self.keep_alive_timeout}秒内未检测到语音活动,进入睡眠状态") self.is_awake = False except Exception as e: diff --git a/src/chatbot/api/async_api.py b/src/chatbot/api/async_api.py index 1e9b81d..c45456b 100644 --- a/src/chatbot/api/async_api.py +++ b/src/chatbot/api/async_api.py @@ -1,3 +1,5 @@ +# type: ignore + from __future__ import annotations import asyncio @@ -106,7 +108,7 @@ async def get_openai_response_stream( Logger.info(f"发送的消息: {data['messages']}") # 使用统一的超时时间控制整个首句获取过程 - async def get_first_sentence_with_timeout(): + async def get_first_sentence_with_timeout(data): nonlocal first_sentence_received buffer = "" @@ -178,7 +180,7 @@ async def get_first_sentence_with_timeout(): Logger.debug(f"等待首句响应(总超时时间: {first_sentence_timeout}秒)...") try: - result = await asyncio.wait_for(get_first_sentence_with_timeout(), timeout=first_sentence_timeout) + result = await asyncio.wait_for(get_first_sentence_with_timeout(data), timeout=first_sentence_timeout) first_sentence, remaining_buffer, resp = result @@ -251,7 +253,7 @@ async def get_first_sentence_with_timeout(): else: Logger.warning("未收到有效的首句内容") - except asyncio.TimeoutError: + except TimeoutError: Logger.warning(f"首句获取总超时({first_sentence_timeout}秒)") except Exception as e: diff --git a/src/chatbot/chatter/voice_activity_monitor.py b/src/chatbot/chatter/voice_activity_monitor.py index 45a5447..26262a5 100644 --- a/src/chatbot/chatter/voice_activity_monitor.py +++ b/src/chatbot/chatter/voice_activity_monitor.py @@ -1,7 +1,7 @@ +# type: ignore from __future__ import annotations import asyncio -import time from typing import TYPE_CHECKING import numpy as np diff --git a/src/chatbot/chatter/voice_recorder.py b/src/chatbot/chatter/voice_recorder.py index b5537da..2c935fc 100644 --- a/src/chatbot/chatter/voice_recorder.py +++ b/src/chatbot/chatter/voice_recorder.py @@ -1,3 +1,4 @@ +# type: ignore from __future__ import annotations import asyncio