"""MrrrMe Backend - Speech Processing Pipeline (COMPLETE)""" import requests import numpy as np from typing import Optional, Dict from ..models.loader import get_models from ..session.manager import save_message from ..utils.helpers import get_avatar_api_url from ..config import HALLUCINATION_PHRASES, MIN_TRANSCRIPTION_LENGTH, EMOTION_MAP AVATAR_API = get_avatar_api_url() def filter_transcription(transcription: str) -> tuple: """ Filter out short or meaningless transcriptions Returns: (should_process, reason_if_skipped) """ # Check length if len(transcription) < MIN_TRANSCRIPTION_LENGTH: return False, f"Too short ({len(transcription)} chars)" # Check for hallucinations cleaned = transcription.lower().strip('.,!?') if cleaned in HALLUCINATION_PHRASES: return False, f"Hallucination detected ('{transcription}')" return True, None async def process_speech_end( websocket, transcription: str, session_id: str, user_id: str, username: str, user_summary: Optional[str], user_preferences: Dict ) -> None: """ Complete speech processing pipeline Steps: 1. Filter transcription 2. Save user message 3. Get emotion from all 3 modalities 4. Fuse emotions with quality-aware weights 5. Generate LLM response 6. Request avatar TTS 7. Send to client """ print(f"\n{'='*80}", flush=True) print(f"[Speech End] USER FINISHED SPEAKING: {username}", flush=True) print(f"{'='*80}", flush=True) print(f"[Transcription] '{transcription}'", flush=True) # Get latest model instances models = get_models() face_processor = models['face_processor'] text_analyzer = models['text_analyzer'] voice_worker = models['voice_worker'] llm_generator = models['llm_generator'] fusion_engine = models['fusion_engine'] # Step 1: Filter transcription should_process, skip_reason = filter_transcription(transcription) if not should_process: print(f"[Filter] Skipped: {skip_reason}", flush=True) return # Step 2: Save user message save_message(session_id, "user", transcription) try: # ========== EMOTION DETECTION PIPELINE ========== print(f"\n[Pipeline] Starting emotion analysis pipeline...", flush=True) print(f"{'─'*80}", flush=True) # Step 3: Get face emotion print(f"[Step 1/4] FACIAL EXPRESSION ANALYSIS", flush=True) face_emotion = face_processor.get_last_emotion() or "Neutral" face_confidence = face_processor.get_last_confidence() or 0.0 face_quality = getattr(face_processor, 'get_last_quality', lambda: 0.5)() # Create emotion probabilities face_probs = np.array([0.25, 0.25, 0.25, 0.25], dtype=np.float32) if face_emotion in EMOTION_MAP: face_idx = EMOTION_MAP[face_emotion] face_probs[face_idx] = face_confidence face_probs = face_probs / face_probs.sum() print(f" Result: {face_emotion}", flush=True) print(f" Confidence: {face_confidence:.3f}", flush=True) print(f" Quality Score: {face_quality:.3f}", flush=True) print(f" Distribution: Neutral={face_probs[0]:.3f} | Happy={face_probs[1]:.3f} | Sad={face_probs[2]:.3f} | Angry={face_probs[3]:.3f}", flush=True) # Step 4: Get voice emotion print(f"\n[Step 2/4] VOICE TONE ANALYSIS", flush=True) voice_probs, voice_emotion = voice_worker.get_probs() voice_state = voice_worker.get_state() voice_active = voice_state.get('speech_active', False) voice_inferences = voice_state.get('inference_count', 0) voice_skipped = voice_state.get('skipped_inferences', 0) print(f" {'ACTIVELY PROCESSING' if voice_active else 'IDLE (no recent speech)'}", flush=True) print(f" Result: {voice_emotion}", flush=True) print(f" Distribution: Neutral={voice_probs[0]:.3f} | Happy={voice_probs[1]:.3f} | Sad={voice_probs[2]:.3f} | Angry={voice_probs[3]:.3f}", flush=True) print(f" Inferences completed: {voice_inferences}", flush=True) print(f" Skipped (silence optimization): {voice_skipped}", flush=True) efficiency = (voice_inferences / (voice_inferences + voice_skipped) * 100) if (voice_inferences + voice_skipped) > 0 else 0 print(f" Processing efficiency: {efficiency:.1f}%", flush=True) # Step 5: Analyze text sentiment print(f"\n[Step 3/4] TEXT SENTIMENT ANALYSIS", flush=True) print(f" Using Whisper transcription", flush=True) text_analyzer.analyze(transcription) text_probs, _ = text_analyzer.get_probs() text_emotion = ['Neutral', 'Happy', 'Sad', 'Angry'][int(np.argmax(text_probs))] print(f" Result: {text_emotion}", flush=True) print(f" Distribution: Neutral={text_probs[0]:.3f} | Happy={text_probs[1]:.3f} | Sad={text_probs[2]:.3f} | Angry={text_probs[3]:.3f}", flush=True) print(f" Text length: {len(transcription)} characters", flush=True) # Step 6: Calculate fusion with quality-aware weights print(f"\n[Step 4/4] MULTI-MODAL FUSION", flush=True) base_weights = { 'face': fusion_engine.alpha_face, 'voice': fusion_engine.alpha_voice, 'text': fusion_engine.alpha_text } # Adjust weights based on quality/confidence adjusted_weights = base_weights.copy() adjustments_made = [] # Reduce face weight if quality is poor if face_quality < 0.5: adjusted_weights['face'] *= 0.7 adjustments_made.append(f"Face weight reduced (low quality: {face_quality:.3f})") # Reduce voice weight if not active if not voice_active: adjusted_weights['voice'] *= 0.5 adjustments_made.append(f"Voice weight reduced (no recent speech)") # Reduce text weight if very short if len(transcription) < 10: adjusted_weights['text'] *= 0.7 adjustments_made.append(f"Text weight reduced (short input: {len(transcription)} chars)") # Normalize weights to sum to 1.0 total_weight = sum(adjusted_weights.values()) final_weights = {k: v/total_weight for k, v in adjusted_weights.items()} print(f" Base weights: Face={base_weights['face']:.3f} | Voice={base_weights['voice']:.3f} | Text={base_weights['text']:.3f}", flush=True) if adjustments_made: print(f" Adjustments:", flush=True) for adj in adjustments_made: print(f" - {adj}", flush=True) print(f" Final weights: Face={final_weights['face']:.3f} | Voice={final_weights['voice']:.3f} | Text={final_weights['text']:.3f}", flush=True) # Calculate weighted fusion fused_probs = ( final_weights['face'] * face_probs + final_weights['voice'] * voice_probs + final_weights['text'] * text_probs ) fused_probs = fused_probs / (np.sum(fused_probs) + 1e-8) fused_emotion, intensity = fusion_engine.fuse(face_probs, voice_probs, text_probs) # Calculate fusion metrics agreement_count = sum([ face_emotion == fused_emotion, voice_emotion == fused_emotion, text_emotion == fused_emotion ]) agreement_score = agreement_count / 3.0 all_same = (face_emotion == voice_emotion == text_emotion) has_conflict = len({face_emotion, voice_emotion, text_emotion}) == 3 print(f"\n {'─'*76}", flush=True) print(f" FUSION RESULTS:", flush=True) print(f" {'─'*76}", flush=True) print(f" Input emotions:", flush=True) print(f" Face: {face_emotion:7s} (confidence={face_probs[EMOTION_MAP.get(face_emotion, 0)]:.3f}, weight={final_weights['face']:.3f})", flush=True) print(f" Voice: {voice_emotion:7s} (confidence={voice_probs[EMOTION_MAP.get(voice_emotion, 0)]:.3f}, weight={final_weights['voice']:.3f})", flush=True) print(f" Text: {text_emotion:7s} (confidence={text_probs[EMOTION_MAP.get(text_emotion, 0)]:.3f}, weight={final_weights['text']:.3f})", flush=True) print(f" {'─'*76}", flush=True) print(f" FUSED EMOTION: {fused_emotion}", flush=True) print(f" Intensity: {intensity:.3f}", flush=True) print(f" Fused distribution: Neutral={fused_probs[0]:.3f} | Happy={fused_probs[1]:.3f} | Sad={fused_probs[2]:.3f} | Angry={fused_probs[3]:.3f}", flush=True) print(f" {'─'*76}", flush=True) print(f" Agreement: {agreement_count}/3 modalities ({agreement_score*100:.1f}%)", flush=True) if all_same: print(f" Status: Perfect agreement - all modalities aligned", flush=True) elif has_conflict: print(f" Status: Full conflict - weighted fusion resolved disagreement", flush=True) else: print(f" Status: Partial agreement - majority vote with confidence weighting", flush=True) print(f" {'─'*76}", flush=True) # ========== LLM INPUT PREPARATION ========== print(f"\n[LLM Input] Preparing context for language model...", flush=True) user_language = user_preferences.get("language", "en") context_prefix = "" if user_summary: context_prefix = f"[User context for {username}: {user_summary}]\n\n" print(f"[LLM Input] - User context: YES ({len(user_summary)} chars)", flush=True) else: print(f"[LLM Input] - User context: NO (new user)", flush=True) # Add language instruction if user_language == "nl": context_prefix += "[BELANGRIJK: Antwoord ALTIJD in het Nederlands!]\n\n" print(f"[LLM Input] - Language: Dutch (Nederlands)", flush=True) else: context_prefix += "[IMPORTANT: ALWAYS respond in English!]\n\n" print(f"[LLM Input] - Language: English", flush=True) full_llm_input = context_prefix + transcription print(f"[LLM Input] - Fused emotion: {fused_emotion}", flush=True) print(f"[LLM Input] - Face emotion: {face_emotion}", flush=True) print(f"[LLM Input] - Voice emotion: {voice_emotion}", flush=True) print(f"[LLM Input] - Intensity: {intensity:.3f}", flush=True) print(f"[LLM Input] - User text: '{transcription}'", flush=True) print(f"[LLM Input] - Full prompt length: {len(full_llm_input)} chars", flush=True) if len(context_prefix) > 50: print(f"[LLM Input] - Context preview: '{context_prefix[:100]}...'", flush=True) # Generate LLM response print(f"\n[LLM] Generating response...", flush=True) response_text = llm_generator.generate_response( fused_emotion, face_emotion, voice_emotion, full_llm_input, force=True, intensity=intensity ) print(f"[LLM] Response generated: '{response_text}'", flush=True) # Save assistant message save_message(session_id, "assistant", response_text, fused_emotion) # ========== SEND TO AVATAR FOR TTS ========== print(f"\n[TTS] Sending to avatar backend...", flush=True) try: voice_preference = user_preferences.get("voice", "female") language_preference = user_preferences.get("language", "en") print(f"[TTS] - Voice: {voice_preference}", flush=True) print(f"[TTS] - Language: {language_preference}", flush=True) print(f"[TTS] - Text: '{response_text}'", flush=True) avatar_response = requests.post( f"{AVATAR_API}/speak", data={ "text": response_text, "voice": voice_preference, "language": language_preference }, timeout=45 ) avatar_response.raise_for_status() avatar_data = avatar_response.json() print(f"[TTS] Avatar TTS generated", flush=True) print(f"[TTS] - Audio URL: {avatar_data.get('audio_url', 'N/A')}", flush=True) print(f"[TTS] - Visemes: {len(avatar_data.get('visemes', []))} keyframes", flush=True) await websocket.send_json({ "type": "llm_response", "text": response_text, "emotion": fused_emotion, "intensity": intensity, "audio_url": avatar_data.get("audio_url"), "visemes": avatar_data.get("visemes") }) print(f"[Pipeline] Complete response sent to {username}", flush=True) except requests.exceptions.ConnectionError: print(f"[TTS] Avatar service not available - sending text-only", flush=True) await websocket.send_json({ "type": "llm_response", "text": response_text, "emotion": fused_emotion, "intensity": intensity, "text_only": True }) except Exception as avatar_err: print(f"[TTS] Avatar error: {avatar_err}", flush=True) await websocket.send_json({ "type": "llm_response", "text": response_text, "emotion": fused_emotion, "intensity": intensity, "error": "Avatar TTS failed", "text_only": True }) print(f"{'='*80}\n", flush=True) except Exception as e: print(f"[Pipeline] Error in emotion processing: {e}", flush=True) import traceback traceback.print_exc()