import numpy as np from app.logger_config import ( logger as logging, DEBUG ) from app.interfaces import IStreamingSpeechEngine,IVoiceActivityEngine from dataclasses import dataclass @dataclass class StreamingAudioProcessorConfig: """Configuration settings for the StreamingAudioProcessor.""" read_size: int = 8000 silence_threshold_chunks: int = 2 sample_rate: int = 16000 # Add other streaming-related parameters here # e.g., vad_padding_chunks: int = 0 class StreamingAudioProcessor: """ Manages streaming transcription by combining a speech engine and a voice activity detector (VAD). This class handles internal audio buffering and VAD state. """ def __init__(self, speech_engine: IStreamingSpeechEngine, vad_engine :IVoiceActivityEngine, cfg : StreamingAudioProcessorConfig): """ Initializes the streaming processor. Args: speech_engine: The ASR speech engine (must have .transcribe_chunk() and .reset()). vad_engine: The VAD engine (returns True/False for a chunk). cfg: The configuration object for this processor. """ logging.info("Initializing StreamingAudioProcessor...") self.speech_engine = speech_engine self.vad_engine = vad_engine # Store config self.VAD_SAMPLE_RATE = cfg.sample_rate self.read_size = cfg.read_size self.SILENCE_THRESHOLD_CHUNKS = cfg.silence_threshold_chunks # Internal buffer state (Optimized: using numpy array) self.internal_buffer = np.array([], dtype='int16') # Internal logic state self.is_first_logical_chunk = True self.logical_chunk_size = self.speech_engine.context_samples.chunk self.initial_logical_chunk_size = self.speech_engine.context_samples.chunk + self.speech_engine.context_samples.right # Internal VAD state self.silent_chunks_count = 0 self.chunks_count = 0 logging.info(f" Config: VAD Sample Rate={self.VAD_SAMPLE_RATE}Hz") logging.info(f" Config: Physical Read Size={self.read_size} samples") logging.info(f" Config: Silence Threshold={self.SILENCE_THRESHOLD_CHUNKS} chunks") logging.info(f" Config: Initial Logical Chunk={self.initial_logical_chunk_size} samples") logging.info(f" Config: Subsequent Logical Chunk={self.logical_chunk_size} samples") def _append_to_buffer(self, chunk_np, asr_chunk_len): """ Appends a NumPy chunk to the internal buffer and returns a logical chunk if ready. (Optimized to use numpy concatenation). """ logging.debug(f"Appending {len(chunk_np)} samples to internal buffer (current size: {len(self.internal_buffer)}).") self.internal_buffer = np.concatenate((self.internal_buffer, chunk_np)) if len(self.internal_buffer) >= asr_chunk_len: asr_signal_chunk = self.internal_buffer[:asr_chunk_len] self.internal_buffer = self.internal_buffer[asr_chunk_len:] logging.debug(f"Extracted logical chunk of {len(asr_signal_chunk)} samples. Buffer remaining: {len(self.internal_buffer)}.") return asr_signal_chunk else: logging.debug(f"Buffer size ({len(self.internal_buffer)}) < target ({asr_chunk_len}). Holding.") return None def _flush_and_reset(self): """ Flushes the remaining buffer to the transcriber, resets the state, and returns the last transcribed text. """ if len(self.internal_buffer) > 0: # Buffer is already a numpy array final_segment_chunk = self.internal_buffer logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.") for seg, new_text in self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) : yield new_text else: # Buffer is empty, but send a silent "flush" # to force the transcriber to finalize its internal state. logging.info("Buffer empty, sending silent flush to finalize segment.") flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16') for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) : yield new_text # Full state reset logging.debug("Resetting speech engine state...") self.speech_engine.reset() # Resets the speech engine (decoder state) logging.debug("Resetting internal buffer and VAD state.") self.internal_buffer = np.array([], dtype='int16') # Reset buffer self.is_first_logical_chunk = True self.silent_chunks_count = 0 yield "" def process_chunk(self, chunk: np.ndarray): """ Processes a single physical chunk (e.g., 8000 samples). Manages VAD, buffering, and transcription. Args: chunk (np.ndarray): The audio chunk (int16). Returns: list: A list of new transcribed text segments. (Often empty, may contain one or more segments). """ new_text_segments = [] self.chunks_count += 1 logging.debug(f"--- Processing Physical Chunk {self.chunks_count} ---") # --- 1. VAD Logic --- has_speech = self.vad_engine(chunk) logging.debug(f"VAD result: {'SPEECH' if has_speech else 'SILENCE'}") if has_speech: self.silent_chunks_count = 0 else: self.silent_chunks_count += 1 logging.debug(f"Silent chunks count: {self.silent_chunks_count}/{self.SILENCE_THRESHOLD_CHUNKS}") silence_reset = self.silent_chunks_count >= self.SILENCE_THRESHOLD_CHUNKS # --- 2. Buffering & Transcription Logic --- target_size = self.initial_logical_chunk_size if self.is_first_logical_chunk else self.logical_chunk_size asr_chunk_np = self._append_to_buffer(chunk, target_size) # Now returns np.ndarray or None if asr_chunk_np is not None: logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...") for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) : logging.info(f"Received new text segment: '{new_text}'") new_text_segments.append(new_text) yield new_text else : yield "" self.is_first_logical_chunk = False # --- 3. VAD Reset Logic --- if silence_reset and not self.is_first_logical_chunk: logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]") # Flush the buffer, reset state, and get final text for reset_text in self._flush_and_reset() : logging.info(f"Received final reset text: '{reset_text}'") new_text_segments.append(reset_text) yield reset_text else : yield "" yield "" def finalize_stream(self): """ Must be called at the very end of the stream (after the loop breaks). Flushes anything remaining in the buffer. """ logging.info("Finalizing stream. Flushing final buffer...") for reset_text in self._flush_and_reset() : logging.info(f"Received final flushed text: '{reset_text}'") yield reset_text