diff --git a/app/src/hooks/useAudioStreamingOrchestrator.ts b/app/src/hooks/useAudioStreamingOrchestrator.ts index bff49ee5..6e09951c 100644 --- a/app/src/hooks/useAudioStreamingOrchestrator.ts +++ b/app/src/hooks/useAudioStreamingOrchestrator.ts @@ -45,13 +45,25 @@ export const useAudioStreamingOrchestrator = ({ const [isPhoneAudioMode, setIsPhoneAudioMode] = useState(false); const buildWebSocketUrl = useCallback((baseUrl: string): string => { + // Normalize user input so we can safely build a websocket URL. let url = baseUrl.trim(); + + // Convert HTTP(S) scheme to WS(S) for websocket connections. url = url.replace(/^http:/, 'ws:').replace(/^https:/, 'wss:'); + + // Ensure the websocket endpoint path is present. if (!url.includes('/ws')) url = url.replace(/\/$/, '') + '/ws'; - if (!url.includes('codec=')) { + + // Force OMI stream codec to Opus: + // - replace existing codec query value, or + // - append codec=opus if missing. + if (/[?&]codec=/i.test(url)) { + url = url.replace(/([?&])codec=[^&]*/i, '$1codec=opus'); + } else { const sep = url.includes('?') ? '&' : '?'; url = url + sep + 'codec=opus'; } + const isAdvanced = settings.jwtToken && settings.isAuthenticated; const isAdvanced = settings.jwtToken && settings.isAuthenticated; if (isAdvanced) { @@ -69,7 +81,9 @@ export const useAudioStreamingOrchestrator = ({ let url = baseUrl.trim(); url = url.replace(/^http:/, 'ws:').replace(/^https:/, 'wss:'); if (!url.includes('/ws')) url = url.replace(/\/$/, '') + '/ws'; - if (!url.includes('codec=')) { + if (/[?&]codec=/i.test(url)) { + url = url.replace(/([?&])codec=[^&]*/i, '$1codec=pcm'); + } else { const sep = url.includes('?') ? '&' : '?'; url = url + sep + 'codec=pcm'; } diff --git a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py index bab956ec..06595f7f 100644 --- a/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py +++ b/backends/advanced/src/advanced_omi_backend/controllers/websocket_controller.py @@ -31,6 +31,7 @@ from advanced_omi_backend.services.audio_stream.producer import ( get_audio_stream_producer, ) +from advanced_omi_backend.utils.omi_codec_utils import is_opus_header_stripped # Thread pool executors for audio decoding _DEC_IO_EXECUTOR = concurrent.futures.ThreadPoolExecutor( @@ -667,6 +668,7 @@ async def _handle_omi_audio_chunk( audio_stream_producer, opus_payload: bytes, decode_packet_fn, + strip_header: bool, user_id: str, client_id: str, packet_count: int, @@ -679,6 +681,7 @@ async def _handle_omi_audio_chunk( audio_stream_producer: Audio stream producer instance opus_payload: Opus-encoded audio bytes decode_packet_fn: Opus decoder function + strip_header: Whether to strip 3-byte BLE header before decoding user_id: User ID client_id: Client ID packet_count: Current packet number for logging @@ -687,7 +690,7 @@ async def _handle_omi_audio_chunk( start_time = time.time() loop = asyncio.get_running_loop() pcm_data = await loop.run_in_executor( - _DEC_IO_EXECUTOR, decode_packet_fn, opus_payload + _DEC_IO_EXECUTOR, decode_packet_fn, opus_payload, strip_header ) decode_time = time.time() - start_time @@ -1031,8 +1034,7 @@ async def _handle_button_event( audio_uuid = client_state.current_audio_uuid application_logger.info( - f"🔘 Button event from {client_id}: {button_state} " - f"(audio_uuid={audio_uuid})" + f"🔘 Button event from {client_id}: {button_state} (audio_uuid={audio_uuid})" ) # Store marker on client state for later persistence to conversation @@ -1353,7 +1355,7 @@ async def handle_omi_websocket( # OMI-specific: Setup Opus decoder decoder = OmiOpusDecoder() - _decode_packet = partial(decoder.decode_packet, strip_header=False) + _decode_packet = decoder.decode_packet packet_count = 0 total_bytes = 0 @@ -1368,20 +1370,26 @@ async def handle_omi_websocket( ) application_logger.info(f"🎙️ OMI audio session started for {client_id}") + audio_start_data = header.get("data", {}) + # Most current clients (mobile app, local wearable relay) send Opus + # payloads with BLE header already removed. + # Allow explicit override for raw BLE packet sources. + client_state.opus_header_stripped = is_opus_header_stripped( + audio_start_data + ) + interim_holder[0] = await _initialize_streaming_session( client_state, audio_stream_producer, user.user_id, user.email, client_id, - header.get( - "data", - { - "rate": OMI_SAMPLE_RATE, - "width": OMI_SAMPLE_WIDTH, - "channels": OMI_CHANNELS, - }, - ), + audio_start_data + or { + "rate": OMI_SAMPLE_RATE, + "width": OMI_SAMPLE_WIDTH, + "channels": OMI_CHANNELS, + }, websocket=ws, ) @@ -1399,6 +1407,7 @@ async def handle_omi_websocket( audio_stream_producer, payload, _decode_packet, + not getattr(client_state, "opus_header_stripped", False), user.user_id, client_id, packet_count, @@ -1476,13 +1485,14 @@ async def handle_pcm_websocket( ) # Handle audio session start (pass websocket for error handling) - audio_streaming, recording_mode = ( - await _handle_audio_session_start( - client_state, - header.get("data", {}), - client_id, - websocket=ws, - ) + ( + audio_streaming, + recording_mode, + ) = await _handle_audio_session_start( + client_state, + header.get("data", {}), + client_id, + websocket=ws, ) # Initialize streaming session diff --git a/backends/advanced/src/advanced_omi_backend/utils/omi_codec_utils.py b/backends/advanced/src/advanced_omi_backend/utils/omi_codec_utils.py new file mode 100644 index 00000000..cea4acb5 --- /dev/null +++ b/backends/advanced/src/advanced_omi_backend/utils/omi_codec_utils.py @@ -0,0 +1,26 @@ +"""Helpers for OMI Opus payload metadata handling.""" + +from typing import Any + + +def is_opus_header_stripped(audio_start_data: dict[str, Any] | None) -> bool: + """ + Determine whether incoming OMI Opus payloads already have BLE header removed. + + Defaults to True because current mobile and relay clients send header-stripped + payload bytes. Raw BLE packet sources can override with + ``opus_header_stripped: false``. + """ + if not audio_start_data: + return True + + value = audio_start_data.get("opus_header_stripped", True) + + if isinstance(value, str): + normalized = value.strip().lower() + if normalized in {"false", "0", "no", "off"}: + return False + if normalized in {"true", "1", "yes", "on"}: + return True + + return bool(value) diff --git a/backends/advanced/tests/test_omi_codec_utils.py b/backends/advanced/tests/test_omi_codec_utils.py new file mode 100644 index 00000000..d8741ef3 --- /dev/null +++ b/backends/advanced/tests/test_omi_codec_utils.py @@ -0,0 +1,35 @@ +import pytest + +from advanced_omi_backend.utils.omi_codec_utils import is_opus_header_stripped + + +@pytest.mark.unit +def test_defaults_to_header_stripped_when_metadata_missing(): + assert is_opus_header_stripped(None) is True + assert is_opus_header_stripped({}) is True + + +@pytest.mark.unit +def test_respects_explicit_boolean_flag(): + assert is_opus_header_stripped({"opus_header_stripped": True}) is True + assert is_opus_header_stripped({"opus_header_stripped": False}) is False + + +@pytest.mark.unit +@pytest.mark.parametrize( + ("value", "expected"), + [ + ("true", True), + ("TRUE", True), + ("1", True), + ("yes", True), + ("on", True), + ("false", False), + ("FALSE", False), + ("0", False), + ("no", False), + ("off", False), + ], +) +def test_handles_string_flags(value, expected): + assert is_opus_header_stripped({"opus_header_stripped": value}) is expected diff --git a/extras/local-wearable-client/backend_sender.py b/extras/local-wearable-client/backend_sender.py index f5b37625..6986d19f 100644 --- a/extras/local-wearable-client/backend_sender.py +++ b/extras/local-wearable-client/backend_sender.py @@ -105,7 +105,11 @@ async def receive_handler(websocket, logger) -> None: if msg_type == "interim_transcript": text = data.get("data", {}).get("text", "")[:50] is_final = data.get("data", {}).get("is_final", False) - logger.debug("Interim transcript (%s): %s...", "FINAL" if is_final else "partial", text) + logger.debug( + "Interim transcript (%s): %s...", + "FINAL" if is_final else "partial", + text, + ) elif msg_type == "ready": logger.info("Backend ready message: %s", data.get("message")) else: @@ -165,6 +169,7 @@ async def stream_to_backend( "width": 2, "channels": 1, "mode": "streaming", + "opus_header_stripped": True, }, "payload_length": None, }