Spaces:
Sleeping
Sleeping
| """MrrrMe Backend - Speech Processing Pipeline (COMPLETE)""" | |
| import requests | |
| import numpy as np | |
| from typing import Optional, Dict | |
| from ..models.loader import get_models | |
| from ..session.manager import save_message | |
| from ..utils.helpers import get_avatar_api_url | |
| from ..config import HALLUCINATION_PHRASES, MIN_TRANSCRIPTION_LENGTH, EMOTION_MAP | |
| AVATAR_API = get_avatar_api_url() | |
| def filter_transcription(transcription: str) -> tuple: | |
| """ | |
| Filter out short or meaningless transcriptions | |
| Returns: | |
| (should_process, reason_if_skipped) | |
| """ | |
| # Check length | |
| if len(transcription) < MIN_TRANSCRIPTION_LENGTH: | |
| return False, f"Too short ({len(transcription)} chars)" | |
| # Check for hallucinations | |
| cleaned = transcription.lower().strip('.,!?') | |
| if cleaned in HALLUCINATION_PHRASES: | |
| return False, f"Hallucination detected ('{transcription}')" | |
| return True, None | |
| async def process_speech_end( | |
| websocket, | |
| transcription: str, | |
| session_id: str, | |
| user_id: str, | |
| username: str, | |
| user_summary: Optional[str], | |
| user_preferences: Dict | |
| ) -> None: | |
| """ | |
| Complete speech processing pipeline | |
| Steps: | |
| 1. Filter transcription | |
| 2. Save user message | |
| 3. Get emotion from all 3 modalities | |
| 4. Fuse emotions with quality-aware weights | |
| 5. Generate LLM response | |
| 6. Request avatar TTS | |
| 7. Send to client | |
| """ | |
| print(f"\n{'='*80}", flush=True) | |
| print(f"[Speech End] USER FINISHED SPEAKING: {username}", flush=True) | |
| print(f"{'='*80}", flush=True) | |
| print(f"[Transcription] '{transcription}'", flush=True) | |
| # Get latest model instances | |
| models = get_models() | |
| face_processor = models['face_processor'] | |
| text_analyzer = models['text_analyzer'] | |
| voice_worker = models['voice_worker'] | |
| llm_generator = models['llm_generator'] | |
| fusion_engine = models['fusion_engine'] | |
| # Step 1: Filter transcription | |
| should_process, skip_reason = filter_transcription(transcription) | |
| if not should_process: | |
| print(f"[Filter] Skipped: {skip_reason}", flush=True) | |
| return | |
| # Step 2: Save user message | |
| save_message(session_id, "user", transcription) | |
| try: | |
| # ========== EMOTION DETECTION PIPELINE ========== | |
| print(f"\n[Pipeline] Starting emotion analysis pipeline...", flush=True) | |
| print(f"{'─'*80}", flush=True) | |
| # Step 3: Get face emotion | |
| print(f"[Step 1/4] FACIAL EXPRESSION ANALYSIS", flush=True) | |
| face_emotion = face_processor.get_last_emotion() or "Neutral" | |
| face_confidence = face_processor.get_last_confidence() or 0.0 | |
| face_quality = getattr(face_processor, 'get_last_quality', lambda: 0.5)() | |
| # Create emotion probabilities | |
| face_probs = np.array([0.25, 0.25, 0.25, 0.25], dtype=np.float32) | |
| if face_emotion in EMOTION_MAP: | |
| face_idx = EMOTION_MAP[face_emotion] | |
| face_probs[face_idx] = face_confidence | |
| face_probs = face_probs / face_probs.sum() | |
| print(f" Result: {face_emotion}", flush=True) | |
| print(f" Confidence: {face_confidence:.3f}", flush=True) | |
| print(f" Quality Score: {face_quality:.3f}", flush=True) | |
| print(f" Distribution: Neutral={face_probs[0]:.3f} | Happy={face_probs[1]:.3f} | Sad={face_probs[2]:.3f} | Angry={face_probs[3]:.3f}", flush=True) | |
| # Step 4: Get voice emotion | |
| print(f"\n[Step 2/4] VOICE TONE ANALYSIS", flush=True) | |
| voice_probs, voice_emotion = voice_worker.get_probs() | |
| voice_state = voice_worker.get_state() | |
| voice_active = voice_state.get('speech_active', False) | |
| voice_inferences = voice_state.get('inference_count', 0) | |
| voice_skipped = voice_state.get('skipped_inferences', 0) | |
| print(f" {'ACTIVELY PROCESSING' if voice_active else 'IDLE (no recent speech)'}", flush=True) | |
| print(f" Result: {voice_emotion}", flush=True) | |
| print(f" Distribution: Neutral={voice_probs[0]:.3f} | Happy={voice_probs[1]:.3f} | Sad={voice_probs[2]:.3f} | Angry={voice_probs[3]:.3f}", flush=True) | |
| print(f" Inferences completed: {voice_inferences}", flush=True) | |
| print(f" Skipped (silence optimization): {voice_skipped}", flush=True) | |
| efficiency = (voice_inferences / (voice_inferences + voice_skipped) * 100) if (voice_inferences + voice_skipped) > 0 else 0 | |
| print(f" Processing efficiency: {efficiency:.1f}%", flush=True) | |
| # Step 5: Analyze text sentiment | |
| print(f"\n[Step 3/4] TEXT SENTIMENT ANALYSIS", flush=True) | |
| print(f" Using Whisper transcription", flush=True) | |
| text_analyzer.analyze(transcription) | |
| text_probs, _ = text_analyzer.get_probs() | |
| text_emotion = ['Neutral', 'Happy', 'Sad', 'Angry'][int(np.argmax(text_probs))] | |
| print(f" Result: {text_emotion}", flush=True) | |
| print(f" Distribution: Neutral={text_probs[0]:.3f} | Happy={text_probs[1]:.3f} | Sad={text_probs[2]:.3f} | Angry={text_probs[3]:.3f}", flush=True) | |
| print(f" Text length: {len(transcription)} characters", flush=True) | |
| # Step 6: Calculate fusion with quality-aware weights | |
| print(f"\n[Step 4/4] MULTI-MODAL FUSION", flush=True) | |
| base_weights = { | |
| 'face': fusion_engine.alpha_face, | |
| 'voice': fusion_engine.alpha_voice, | |
| 'text': fusion_engine.alpha_text | |
| } | |
| # Adjust weights based on quality/confidence | |
| adjusted_weights = base_weights.copy() | |
| adjustments_made = [] | |
| # Reduce face weight if quality is poor | |
| if face_quality < 0.5: | |
| adjusted_weights['face'] *= 0.7 | |
| adjustments_made.append(f"Face weight reduced (low quality: {face_quality:.3f})") | |
| # Reduce voice weight if not active | |
| if not voice_active: | |
| adjusted_weights['voice'] *= 0.5 | |
| adjustments_made.append(f"Voice weight reduced (no recent speech)") | |
| # Reduce text weight if very short | |
| if len(transcription) < 10: | |
| adjusted_weights['text'] *= 0.7 | |
| adjustments_made.append(f"Text weight reduced (short input: {len(transcription)} chars)") | |
| # Normalize weights to sum to 1.0 | |
| total_weight = sum(adjusted_weights.values()) | |
| final_weights = {k: v/total_weight for k, v in adjusted_weights.items()} | |
| print(f" Base weights: Face={base_weights['face']:.3f} | Voice={base_weights['voice']:.3f} | Text={base_weights['text']:.3f}", flush=True) | |
| if adjustments_made: | |
| print(f" Adjustments:", flush=True) | |
| for adj in adjustments_made: | |
| print(f" - {adj}", flush=True) | |
| print(f" Final weights: Face={final_weights['face']:.3f} | Voice={final_weights['voice']:.3f} | Text={final_weights['text']:.3f}", flush=True) | |
| # Calculate weighted fusion | |
| fused_probs = ( | |
| final_weights['face'] * face_probs + | |
| final_weights['voice'] * voice_probs + | |
| final_weights['text'] * text_probs | |
| ) | |
| fused_probs = fused_probs / (np.sum(fused_probs) + 1e-8) | |
| fused_emotion, intensity = fusion_engine.fuse(face_probs, voice_probs, text_probs) | |
| # Calculate fusion metrics | |
| agreement_count = sum([ | |
| face_emotion == fused_emotion, | |
| voice_emotion == fused_emotion, | |
| text_emotion == fused_emotion | |
| ]) | |
| agreement_score = agreement_count / 3.0 | |
| all_same = (face_emotion == voice_emotion == text_emotion) | |
| has_conflict = len({face_emotion, voice_emotion, text_emotion}) == 3 | |
| print(f"\n {'─'*76}", flush=True) | |
| print(f" FUSION RESULTS:", flush=True) | |
| print(f" {'─'*76}", flush=True) | |
| print(f" Input emotions:", flush=True) | |
| print(f" Face: {face_emotion:7s} (confidence={face_probs[EMOTION_MAP.get(face_emotion, 0)]:.3f}, weight={final_weights['face']:.3f})", flush=True) | |
| print(f" Voice: {voice_emotion:7s} (confidence={voice_probs[EMOTION_MAP.get(voice_emotion, 0)]:.3f}, weight={final_weights['voice']:.3f})", flush=True) | |
| print(f" Text: {text_emotion:7s} (confidence={text_probs[EMOTION_MAP.get(text_emotion, 0)]:.3f}, weight={final_weights['text']:.3f})", flush=True) | |
| print(f" {'─'*76}", flush=True) | |
| print(f" FUSED EMOTION: {fused_emotion}", flush=True) | |
| print(f" Intensity: {intensity:.3f}", flush=True) | |
| print(f" Fused distribution: Neutral={fused_probs[0]:.3f} | Happy={fused_probs[1]:.3f} | Sad={fused_probs[2]:.3f} | Angry={fused_probs[3]:.3f}", flush=True) | |
| print(f" {'─'*76}", flush=True) | |
| print(f" Agreement: {agreement_count}/3 modalities ({agreement_score*100:.1f}%)", flush=True) | |
| if all_same: | |
| print(f" Status: Perfect agreement - all modalities aligned", flush=True) | |
| elif has_conflict: | |
| print(f" Status: Full conflict - weighted fusion resolved disagreement", flush=True) | |
| else: | |
| print(f" Status: Partial agreement - majority vote with confidence weighting", flush=True) | |
| print(f" {'─'*76}", flush=True) | |
| # ========== LLM INPUT PREPARATION ========== | |
| print(f"\n[LLM Input] Preparing context for language model...", flush=True) | |
| user_language = user_preferences.get("language", "en") | |
| context_prefix = "" | |
| if user_summary: | |
| context_prefix = f"[User context for {username}: {user_summary}]\n\n" | |
| print(f"[LLM Input] - User context: YES ({len(user_summary)} chars)", flush=True) | |
| else: | |
| print(f"[LLM Input] - User context: NO (new user)", flush=True) | |
| # Add language instruction | |
| if user_language == "nl": | |
| context_prefix += "[BELANGRIJK: Antwoord ALTIJD in het Nederlands!]\n\n" | |
| print(f"[LLM Input] - Language: Dutch (Nederlands)", flush=True) | |
| else: | |
| context_prefix += "[IMPORTANT: ALWAYS respond in English!]\n\n" | |
| print(f"[LLM Input] - Language: English", flush=True) | |
| full_llm_input = context_prefix + transcription | |
| print(f"[LLM Input] - Fused emotion: {fused_emotion}", flush=True) | |
| print(f"[LLM Input] - Face emotion: {face_emotion}", flush=True) | |
| print(f"[LLM Input] - Voice emotion: {voice_emotion}", flush=True) | |
| print(f"[LLM Input] - Intensity: {intensity:.3f}", flush=True) | |
| print(f"[LLM Input] - User text: '{transcription}'", flush=True) | |
| print(f"[LLM Input] - Full prompt length: {len(full_llm_input)} chars", flush=True) | |
| if len(context_prefix) > 50: | |
| print(f"[LLM Input] - Context preview: '{context_prefix[:100]}...'", flush=True) | |
| # Generate LLM response | |
| print(f"\n[LLM] Generating response...", flush=True) | |
| response_text = llm_generator.generate_response( | |
| fused_emotion, face_emotion, voice_emotion, | |
| full_llm_input, force=True, intensity=intensity | |
| ) | |
| print(f"[LLM] Response generated: '{response_text}'", flush=True) | |
| # Save assistant message | |
| save_message(session_id, "assistant", response_text, fused_emotion) | |
| # ========== SEND TO AVATAR FOR TTS ========== | |
| print(f"\n[TTS] Sending to avatar backend...", flush=True) | |
| try: | |
| voice_preference = user_preferences.get("voice", "female") | |
| language_preference = user_preferences.get("language", "en") | |
| print(f"[TTS] - Voice: {voice_preference}", flush=True) | |
| print(f"[TTS] - Language: {language_preference}", flush=True) | |
| print(f"[TTS] - Text: '{response_text}'", flush=True) | |
| avatar_response = requests.post( | |
| f"{AVATAR_API}/speak", | |
| data={ | |
| "text": response_text, | |
| "voice": voice_preference, | |
| "language": language_preference | |
| }, | |
| timeout=45 | |
| ) | |
| avatar_response.raise_for_status() | |
| avatar_data = avatar_response.json() | |
| print(f"[TTS] Avatar TTS generated", flush=True) | |
| print(f"[TTS] - Audio URL: {avatar_data.get('audio_url', 'N/A')}", flush=True) | |
| print(f"[TTS] - Visemes: {len(avatar_data.get('visemes', []))} keyframes", flush=True) | |
| await websocket.send_json({ | |
| "type": "llm_response", | |
| "text": response_text, | |
| "emotion": fused_emotion, | |
| "intensity": intensity, | |
| "audio_url": avatar_data.get("audio_url"), | |
| "visemes": avatar_data.get("visemes") | |
| }) | |
| print(f"[Pipeline] Complete response sent to {username}", flush=True) | |
| except requests.exceptions.ConnectionError: | |
| print(f"[TTS] Avatar service not available - sending text-only", flush=True) | |
| await websocket.send_json({ | |
| "type": "llm_response", | |
| "text": response_text, | |
| "emotion": fused_emotion, | |
| "intensity": intensity, | |
| "text_only": True | |
| }) | |
| except Exception as avatar_err: | |
| print(f"[TTS] Avatar error: {avatar_err}", flush=True) | |
| await websocket.send_json({ | |
| "type": "llm_response", | |
| "text": response_text, | |
| "emotion": fused_emotion, | |
| "intensity": intensity, | |
| "error": "Avatar TTS failed", | |
| "text_only": True | |
| }) | |
| print(f"{'='*80}\n", flush=True) | |
| except Exception as e: | |
| print(f"[Pipeline] Error in emotion processing: {e}", flush=True) | |
| import traceback | |
| traceback.print_exc() |