From 73e07ef00f968f5e6ca308b1060b2cd9faf869db Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Fri, 2 Jan 2026 09:54:54 +0000 Subject: [PATCH 1/6] Bug Fix for 8-bit PCM and Refactoring SileroVAD --- sdk/voice/speechmatics/voice/_vad.py | 186 +++++++++++++++++++-------- 1 file changed, 131 insertions(+), 55 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_vad.py b/sdk/voice/speechmatics/voice/_vad.py index e5a7b1e..b78e909 100644 --- a/sdk/voice/speechmatics/voice/_vad.py +++ b/sdk/voice/speechmatics/voice/_vad.py @@ -225,87 +225,163 @@ def process_chunk(self, chunk_f32: np.ndarray) -> float: # Return probability (out shape is (1, 1)) return float(out[0][0]) - - async def process_audio(self, audio_bytes: bytes, sample_rate: int = 16000, sample_width: int = 2) -> None: - """Process incoming audio bytes and invoke callback on state changes. - - This method buffers incomplete chunks and processes all complete 512-sample chunks. - The callback is invoked only once at the end if the VAD state changed during processing. + + def _validate_input(self, sample_rate: int) -> bool: + """ + Ensures the VAD is ready and the incoming audio format + matches the model's requirements. Args: - audio_bytes: Raw audio bytes (int16 PCM). - sample_rate: Sample rate of the audio (must be 16000). - sample_width: Sample width in bytes (2 for int16). - """ + sample_rate: Sample rate of the incoming audio. + Returns: + True if the VAD is ready and the incoming sample rate matches the model's requirements. + """ if not self._is_initialized: logger.error("SileroVAD is not initialized") - return + return False if sample_rate != SILERO_SAMPLE_RATE: logger.error(f"Sample rate must be {SILERO_SAMPLE_RATE}Hz, got {sample_rate}Hz") - return + return False + + return True - # Add new bytes to buffer - self._audio_buffer += audio_bytes + def _get_audio_chunks(self, sample_width: int): + """ + A generator that yields complete 512-sample chunks from the buffer. + Incomplete data remains in the buffer for the next call. + + Args: + sample_width: Sample width of the incoming audio. - # Calculate bytes per chunk (512 samples * 2 bytes for int16) + Yields: + Complete 512-sample chunks from the buffer. + """ + # Calculate bytes needed for a full model window bytes_per_chunk = SILERO_CHUNK_SIZE * sample_width - # Process all complete chunks in buffer while len(self._audio_buffer) >= bytes_per_chunk: - # Extract one chunk - chunk_bytes = self._audio_buffer[:bytes_per_chunk] + # Extract the chunk from the front of the buffer + chunk = self._audio_buffer[:bytes_per_chunk] self._audio_buffer = self._audio_buffer[bytes_per_chunk:] - # Convert bytes to int16 array - dtype = np.int16 if sample_width == 2 else np.int8 - int16_array: np.ndarray = np.frombuffer(chunk_bytes, dtype=dtype).astype(np.int16) + yield chunk - # Convert int16 to float32 in range [-1, 1] - float32_array: np.ndarray = int16_array.astype(np.float32) / 32768.0 + def _prepare_chunk(self, chunk_bytes: bytes, sample_width: int) -> np.ndarray: + """ + Translates raw PCM bytes into a normalised float32 array in the range [-1, 1], + compatible with the Silero VAD model. - try: - # Process the chunk and add probability to rolling window - probability = self.process_chunk(float32_array) - self._prediction_window.append(probability) + Args: + chunk_bytes: Audio bytes to be processed. + sample_width: Sample width of the incoming audio. - except Exception as e: - logger.error(f"Error processing VAD chunk: {e}") + Returns: + Normalised float32 array in the range [-1, 1]. + """ + if sample_width == 2: + dtype = np.int16 + divisor = 32768.0 + elif sample_width == 1: + dtype = np.int8 + divisor = 128.0 + else: + raise ValueError(f"Unsupported sample_width {sample_width}") + + # Decode and normalize the chunk data + int_array = np.frombuffer(chunk_bytes, dtype=dtype) + float32_array: np.ndarray = int_array.astype(np.float32) / divisor + + return float32_array + + def _evaluate_activity_change(self) -> None: + """ + Analyzes the prediction window of probabilities to determine if the user has started + or stopped speaking. If the state has changed, emit a VADStatusMessage. + + Returns: + None + """ + if len(self._prediction_window) == 0: + return - # After processing all chunks, calculate weighted average from window - if len(self._prediction_window) > 0: - # Calculate weighted average (most recent predictions have higher weight) - weights = np.arange(1, len(self._prediction_window) + 1, dtype=np.float32) - weighted_avg = np.average(list(self._prediction_window), weights=weights) + # Calculate weighted average (most recent predictions have higher weight) + probs = list(self._prediction_window) + weights = np.arange(1, len(probs) + 1, dtype=np.float32) + weighted_avg = np.average(probs, weights=weights) - # Determine speech state from weighted average - is_speech = bool(weighted_avg >= self._threshold) + # Determine speech state from weighted average + is_speech = bool(weighted_avg >= self._threshold) - # Check if state changed - state_changed = self._last_is_speech != is_speech + # Check if state changed + state_changed = self._last_is_speech != is_speech + if state_changed: + self._dispatch_vad_event(is_speech, weighted_avg) - # Emit callback if state changed - if state_changed and self._on_state_change: - # Calculate transition duration (window duration) - transition_duration = len(self._prediction_window) * SILERO_CHUNK_DURATION_MS + # Update state after emitting + self._last_is_speech = is_speech - # Determine if speech ended - speech_ended = self._last_is_speech and not is_speech + def _dispatch_vad_event(self, is_speech: bool, probability: float) -> None: + """ + Constructs the result object and executes the on_state_change callback + function if set. - # VAD result - result = SileroVADResult( - is_speech=is_speech, - probability=round(float(weighted_avg), 3), - transition_duration_ms=transition_duration, - speech_ended=speech_ended, - ) + Args: + is_speech: True if speech is detected, False otherwise. + probability: Speech probability (0.0-1.0). + """ + if not self._on_state_change: + return + + # Calculate how many milliseconds of audio the window represents + duration_ms = len(self._prediction_window) * SILERO_CHUNK_DURATION_MS - # Trigger callback - self._on_state_change(result) + # Determine if speech has ended + speech_ended = self._last_is_speech and not is_speech - # Update state after emitting - self._last_is_speech = is_speech + # Create VAD result + result = SileroVADResult( + is_speech=is_speech, + probability=round(float(probability), 3), + transition_duration_ms=duration_ms, + speech_ended=speech_ended, + ) + + # Trigger callback with result + self._on_state_change(result) + + + async def process_audio(self, audio_bytes: bytes, sample_rate: int = 16000, sample_width: int = 2) -> None: + """Process incoming audio bytes and invoke callback on state changes. + + This method buffers incomplete chunks and processes all complete 512-sample chunks. + The callback is invoked only once at the end if the VAD state changed during processing. + + Args: + audio_bytes: Raw audio bytes (int16 PCM). + sample_rate: Sample rate of the audio (must be 16000). + sample_width: Sample width in bytes (2 for int16). + """ + if not self._validate_input(sample_rate): + return + + # Add new bytes to the buffer + self._audio_buffer += audio_bytes + + # Process all complete chunks in the buffer + for chunk in self._get_audio_chunks(sample_width): + audio_f32 = self._prepare_chunk(chunk, sample_width) + + try: + probability = self.process_chunk(audio_f32) + self._prediction_window.append(probability) + except Exception as e: + logger.error(f"Error processing VAD chunk: {e}") + continue + + # Check if VAD state has changed + self._evaluate_activity_change() def reset(self) -> None: """Reset the VAD state and clear audio buffer.""" From 45358c9e32e4ff8e2736e9660aafdc8a4d5ffc33 Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Mon, 5 Jan 2026 10:47:59 +0000 Subject: [PATCH 2/6] Clean up VAD evaluation of activity change --- sdk/voice/speechmatics/voice/_vad.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_vad.py b/sdk/voice/speechmatics/voice/_vad.py index b78e909..5f6eaf5 100644 --- a/sdk/voice/speechmatics/voice/_vad.py +++ b/sdk/voice/speechmatics/voice/_vad.py @@ -316,24 +316,27 @@ def _evaluate_activity_change(self) -> None: # Check if state changed state_changed = self._last_is_speech != is_speech - if state_changed: - self._dispatch_vad_event(is_speech, weighted_avg) - # Update state after emitting - self._last_is_speech = is_speech + if not state_changed: + # No change, exit early + return + + # Trigger callback function for state change + if self._on_state_change: + self._trigger_on_state_change(is_speech, weighted_avg) + + # Update state + self._last_is_speech = is_speech - def _dispatch_vad_event(self, is_speech: bool, probability: float) -> None: + def _trigger_on_state_change(self, is_speech: bool, probability: float) -> None: """ Constructs the result object and executes the on_state_change callback - function if set. + function. Args: is_speech: True if speech is detected, False otherwise. probability: Speech probability (0.0-1.0). """ - if not self._on_state_change: - return - # Calculate how many milliseconds of audio the window represents duration_ms = len(self._prediction_window) * SILERO_CHUNK_DURATION_MS @@ -351,7 +354,6 @@ def _dispatch_vad_event(self, is_speech: bool, probability: float) -> None: # Trigger callback with result self._on_state_change(result) - async def process_audio(self, audio_bytes: bytes, sample_rate: int = 16000, sample_width: int = 2) -> None: """Process incoming audio bytes and invoke callback on state changes. From 067a7608f8cc13fc9d1429934d4a833ea54e3994 Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Mon, 5 Jan 2026 10:49:25 +0000 Subject: [PATCH 3/6] Add input validation to put_frame function --- sdk/voice/speechmatics/voice/_audio.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_audio.py b/sdk/voice/speechmatics/voice/_audio.py index 6653db9..033e496 100644 --- a/sdk/voice/speechmatics/voice/_audio.py +++ b/sdk/voice/speechmatics/voice/_audio.py @@ -90,7 +90,8 @@ async def put_bytes(self, data: bytes) -> None: data: The data frame to add to the buffer. """ - # If the right length and buffer zero + # If data is exactly one frame and there's no buffered remainder, + # put the frame directly into the buffer. if len(data) // self._sample_width == self._frame_size and len(self._buffer) == 0: return await self.put_frame(data) @@ -109,19 +110,23 @@ async def put_bytes(self, data: bytes) -> None: await self.put_frame(frame) async def put_frame(self, data: bytes) -> None: - """Add data to the buffer. + """Add data frame to the buffer. - New data added to the end of the buffer. The oldest data is removed - to maintain the total number of seconds in the buffer. + New data frame is added to the end of the buffer. The oldest data is removed + to maintain the total number of seconds in the buffer.` Args: data: The data frame to add to the buffer. """ + # Verify number of bytes matches frame size + if len(data) != self._frame_bytes: + raise ValueError(f"Invalid frame size: {len(data)} bytes, expected {self._frame_bytes} bytes") # Add data to the buffer async with self._lock: self._frames.append(data) self._total_frames += 1 + # Trim to rolling window, keep last _max_frames frames if len(self._frames) > self._max_frames: self._frames = self._frames[-self._max_frames :] From a6db8735fe8368da0525a1b9ef581730e4cd082f Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Tue, 6 Jan 2026 15:56:15 +0000 Subject: [PATCH 4/6] Fix Comment Typos --- sdk/voice/speechmatics/voice/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_utils.py b/sdk/voice/speechmatics/voice/_utils.py index ecb01d0..c8467b3 100644 --- a/sdk/voice/speechmatics/voice/_utils.py +++ b/sdk/voice/speechmatics/voice/_utils.py @@ -110,7 +110,7 @@ def segment_list_from_fragments( speaker_groups.append([]) speaker_groups[-1].append(frag) - # Create SpeakerFragments objects + # Create SpeakerSegment objects segments: list[SpeakerSegment] = [] for group in speaker_groups: # Skip if the group is empty @@ -143,7 +143,7 @@ def segment_list_from_fragments( FragmentUtils.update_segment_text(session=session, segment=segment) segments.append(segment) - # Return the grouped SpeakerFragments objects + # Return the grouped SpeakerSegment objects return segments @staticmethod From 5e815ae301d8042893b53ad5fc5042ee716a8b5d Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Wed, 7 Jan 2026 08:42:41 +0000 Subject: [PATCH 5/6] Fix Message Naming --- sdk/voice/speechmatics/voice/_models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_models.py b/sdk/voice/speechmatics/voice/_models.py index 5f819b9..853fe51 100644 --- a/sdk/voice/speechmatics/voice/_models.py +++ b/sdk/voice/speechmatics/voice/_models.py @@ -142,9 +142,9 @@ class AgentServerMessageType(str, Enum): StartOfTurn: Start of turn has been detected. EndOfTurnPrediction: End of turn prediction timing. EndOfTurn: End of turn has been detected. - SmartTurn: Smart turn metadata. + SmartTurnResult: Smart turn metadata. SpeakersResult: Speakers result has been detected. - Metrics: Metrics for the STT engine. + SessionMetrics: Metrics for the STT engine. SpeakerMetrics: Metrics relating to speakers. Examples: From 08a86bb8b85722f972f95cd473b2c1a3292ecd23 Mon Sep 17 00:00:00 2001 From: Lorna Armstrong Date: Mon, 12 Jan 2026 09:34:05 +0000 Subject: [PATCH 6/6] Linting and Smart Turn Refactor --- sdk/voice/speechmatics/voice/_smart_turn.py | 98 ++++++++++++++------- sdk/voice/speechmatics/voice/_vad.py | 20 ++--- 2 files changed, 76 insertions(+), 42 deletions(-) diff --git a/sdk/voice/speechmatics/voice/_smart_turn.py b/sdk/voice/speechmatics/voice/_smart_turn.py index 9ce44a0..7ee047f 100644 --- a/sdk/voice/speechmatics/voice/_smart_turn.py +++ b/sdk/voice/speechmatics/voice/_smart_turn.py @@ -78,6 +78,9 @@ class SmartTurnDetector: Further information at https://github.com/pipecat-ai/smart-turn """ + WINDOW_SECONDS = 8 + DEFAULT_SAMPLE_RATE = 16000 + def __init__(self, auto_init: bool = True, threshold: float = 0.8): """Create the new SmartTurnDetector. @@ -125,7 +128,7 @@ def setup(self) -> None: self.session = self.build_session(SMART_TURN_MODEL_LOCAL_PATH) # Load the feature extractor - self.feature_extractor = WhisperFeatureExtractor(chunk_length=8) + self.feature_extractor = WhisperFeatureExtractor(chunk_length=self.WINDOW_SECONDS) # Set initialized self._is_initialized = True @@ -156,83 +159,113 @@ def build_session(self, onnx_path: str) -> ort.InferenceSession: # Return the new session return ort.InferenceSession(onnx_path, sess_options=so) - async def predict( - self, audio_array: bytes, language: str, sample_rate: int = 16000, sample_width: int = 2 - ) -> SmartTurnPredictionResult: - """Predict whether an audio segment is complete (turn ended) or incomplete. + def _prepare_audio(self, audio_array: bytes, sample_rate: int, sample_width: int) -> np.ndarray: + """Prepare the audio for inference. Args: audio_array: Numpy array containing audio samples at 16kHz. The function will convert the audio into float32 and truncate to 8 seconds (keeping the end) or pad to 8 seconds. - language: Language of the audio. sample_rate: Sample rate of the audio. sample_width: Sample width of the audio. Returns: - Prediction result containing completion status and probability. + Numpy array containing audio samples at 16kHz. """ - - # Check if initialized - if not self._is_initialized: - return SmartTurnPredictionResult(error="SmartTurnDetector is not initialized") - - # Check a valid language - if not self.valid_language(language): - logger.warning(f"Invalid language: {language}. Results may be unreliable.") - - # Record start time - start_time = datetime.datetime.now() - # Convert into numpy array dtype = np.int16 if sample_width == 2 else np.int8 int16_array: np.ndarray = np.frombuffer(audio_array, dtype=dtype).astype(np.int16) - # Truncate to last 8 seconds if needed (keep the tail/end of audio) - max_samples = 8 * sample_rate + # Truncate to last WINDOW_SECONDS seconds if needed (keep the tail/end of audio) + max_samples = self.WINDOW_SECONDS * sample_rate if len(int16_array) > max_samples: int16_array = int16_array[-max_samples:] # Convert int16 to float32 in range [-1, 1] (same as reference implementation) float32_array: np.ndarray = int16_array.astype(np.float32) / 32768.0 - # Process audio using Whisper's feature extractor + return float32_array + + def _get_input_features(self, audio_data: np.ndarray, sample_rate: int) -> np.ndarray: + """ + Get the input features for the audio data using Whisper's feature extractor. + + Args: + audio_data: Numpy array containing audio samples. + sample_rate: Sample rate of the audio. + """ + inputs = self.feature_extractor( - float32_array, + audio_data, sampling_rate=sample_rate, return_tensors="np", padding="max_length", - max_length=max_samples, + max_length= self.WINDOW_SECONDS * sample_rate, truncation=True, do_normalize=True, ) - # Extract features and ensure correct shape for ONNX + # Ensure dimensions are correct shape for ONNX input_features = inputs.input_features.squeeze(0).astype(np.float32) input_features = np.expand_dims(input_features, axis=0) - # Run ONNX inference - outputs = self.session.run(None, {"input_features": input_features}) + return input_features + + async def predict( + self, audio_array: bytes, language: str, sample_rate: int = DEFAULT_SAMPLE_RATE, sample_width: int = 2 + ) -> SmartTurnPredictionResult: + """Predict whether an audio segment is complete (turn ended) or incomplete. + + Args: + audio_array: Numpy array containing audio samples at 16kHz. The function + will convert the audio into float32 and truncate to 8 seconds (keeping the end) + or pad to 8 seconds. + language: Language of the audio. + sample_rate: Sample rate of the audio. + sample_width: Sample width of the audio. + + Returns: + Prediction result containing completion status and probability. + """ - # Extract probability (ONNX model returns sigmoid probabilities) + # Check if initialized + if not self._is_initialized: + return SmartTurnPredictionResult(error="SmartTurnDetector is not initialized") + + # Check a valid language + if not self.valid_language(language): + logger.warning(f"Invalid language: {language}. Results may be unreliable.") + + # Record start time + start_time = datetime.datetime.now() + + # Convert the audio into required format + prepared_audio = self._prepare_audio(audio_array, sample_rate, sample_width) + + # Feature extraction + input_features = self._get_input_features(prepared_audio, sample_rate) + + # Model inference + outputs = self.session.run(None, {"input_features": input_features}) probability = outputs[0][0].item() # Make prediction (True for Complete, False for Incomplete) prediction = probability >= self._threshold - # Record end time + # Result Formatting end_time = datetime.datetime.now() + duration = float((end_time - start_time).total_seconds()) # Return the result return SmartTurnPredictionResult( prediction=prediction, probability=round(probability, 3), - processing_time=round(float((end_time - start_time).total_seconds()), 3), + processing_time=round(duration, 3), ) @staticmethod def truncate_audio_to_last_n_seconds( - audio_array: np.ndarray, n_seconds: float = 8.0, sample_rate: int = 16000 + audio_array: np.ndarray, n_seconds: float = 8.0, sample_rate: int = DEFAULT_SAMPLE_RATE ) -> np.ndarray: """Truncate audio to last n seconds or pad with zeros to meet n seconds. @@ -300,7 +333,8 @@ def model_exists() -> bool: @staticmethod def valid_language(language: str) -> bool: - """Check if the language is valid. + """Check if the language is valid against list of supported languages + for the Pipecat model. Args: language: Language code to validate. diff --git a/sdk/voice/speechmatics/voice/_vad.py b/sdk/voice/speechmatics/voice/_vad.py index 5f6eaf5..9ea4306 100644 --- a/sdk/voice/speechmatics/voice/_vad.py +++ b/sdk/voice/speechmatics/voice/_vad.py @@ -225,7 +225,7 @@ def process_chunk(self, chunk_f32: np.ndarray) -> float: # Return probability (out shape is (1, 1)) return float(out[0][0]) - + def _validate_input(self, sample_rate: int) -> bool: """ Ensures the VAD is ready and the incoming audio format @@ -244,7 +244,7 @@ def _validate_input(self, sample_rate: int) -> bool: if sample_rate != SILERO_SAMPLE_RATE: logger.error(f"Sample rate must be {SILERO_SAMPLE_RATE}Hz, got {sample_rate}Hz") return False - + return True def _get_audio_chunks(self, sample_width: int): @@ -282,17 +282,17 @@ def _prepare_chunk(self, chunk_bytes: bytes, sample_width: int) -> np.ndarray: """ if sample_width == 2: dtype = np.int16 - divisor = 32768.0 + divisor = 32768.0 elif sample_width == 1: dtype = np.int8 divisor = 128.0 else: raise ValueError(f"Unsupported sample_width {sample_width}") - # Decode and normalize the chunk data + # Decode and normalize the chunk data int_array = np.frombuffer(chunk_bytes, dtype=dtype) float32_array: np.ndarray = int_array.astype(np.float32) / divisor - + return float32_array def _evaluate_activity_change(self) -> None: @@ -353,7 +353,7 @@ def _trigger_on_state_change(self, is_speech: bool, probability: float) -> None: # Trigger callback with result self._on_state_change(result) - + async def process_audio(self, audio_bytes: bytes, sample_rate: int = 16000, sample_width: int = 2) -> None: """Process incoming audio bytes and invoke callback on state changes. @@ -367,21 +367,21 @@ async def process_audio(self, audio_bytes: bytes, sample_rate: int = 16000, samp """ if not self._validate_input(sample_rate): return - + # Add new bytes to the buffer self._audio_buffer += audio_bytes - + # Process all complete chunks in the buffer for chunk in self._get_audio_chunks(sample_width): audio_f32 = self._prepare_chunk(chunk, sample_width) - + try: probability = self.process_chunk(audio_f32) self._prediction_window.append(probability) except Exception as e: logger.error(f"Error processing VAD chunk: {e}") continue - + # Check if VAD state has changed self._evaluate_activity_change()