michon's picture
update speech again
8174964
"""MrrrMe Backend - Speech Processing Pipeline (COMPLETE)"""
import requests
import numpy as np
from typing import Optional, Dict
from ..models.loader import get_models
from ..session.manager import save_message
from ..utils.helpers import get_avatar_api_url
from ..config import HALLUCINATION_PHRASES, MIN_TRANSCRIPTION_LENGTH, EMOTION_MAP
AVATAR_API = get_avatar_api_url()
def filter_transcription(transcription: str) -> tuple:
"""
Filter out short or meaningless transcriptions
Returns:
(should_process, reason_if_skipped)
"""
# Check length
if len(transcription) < MIN_TRANSCRIPTION_LENGTH:
return False, f"Too short ({len(transcription)} chars)"
# Check for hallucinations
cleaned = transcription.lower().strip('.,!?')
if cleaned in HALLUCINATION_PHRASES:
return False, f"Hallucination detected ('{transcription}')"
return True, None
async def process_speech_end(
websocket,
transcription: str,
session_id: str,
user_id: str,
username: str,
user_summary: Optional[str],
user_preferences: Dict
) -> None:
"""
Complete speech processing pipeline
Steps:
1. Filter transcription
2. Save user message
3. Get emotion from all 3 modalities
4. Fuse emotions with quality-aware weights
5. Generate LLM response
6. Request avatar TTS
7. Send to client
"""
print(f"\n{'='*80}", flush=True)
print(f"[Speech End] USER FINISHED SPEAKING: {username}", flush=True)
print(f"{'='*80}", flush=True)
print(f"[Transcription] '{transcription}'", flush=True)
# Get latest model instances
models = get_models()
face_processor = models['face_processor']
text_analyzer = models['text_analyzer']
voice_worker = models['voice_worker']
llm_generator = models['llm_generator']
fusion_engine = models['fusion_engine']
# Step 1: Filter transcription
should_process, skip_reason = filter_transcription(transcription)
if not should_process:
print(f"[Filter] Skipped: {skip_reason}", flush=True)
return
# Step 2: Save user message
save_message(session_id, "user", transcription)
try:
# ========== EMOTION DETECTION PIPELINE ==========
print(f"\n[Pipeline] Starting emotion analysis pipeline...", flush=True)
print(f"{'─'*80}", flush=True)
# Step 3: Get face emotion
print(f"[Step 1/4] FACIAL EXPRESSION ANALYSIS", flush=True)
face_emotion = face_processor.get_last_emotion() or "Neutral"
face_confidence = face_processor.get_last_confidence() or 0.0
face_quality = getattr(face_processor, 'get_last_quality', lambda: 0.5)()
# Create emotion probabilities
face_probs = np.array([0.25, 0.25, 0.25, 0.25], dtype=np.float32)
if face_emotion in EMOTION_MAP:
face_idx = EMOTION_MAP[face_emotion]
face_probs[face_idx] = face_confidence
face_probs = face_probs / face_probs.sum()
print(f" Result: {face_emotion}", flush=True)
print(f" Confidence: {face_confidence:.3f}", flush=True)
print(f" Quality Score: {face_quality:.3f}", flush=True)
print(f" Distribution: Neutral={face_probs[0]:.3f} | Happy={face_probs[1]:.3f} | Sad={face_probs[2]:.3f} | Angry={face_probs[3]:.3f}", flush=True)
# Step 4: Get voice emotion
print(f"\n[Step 2/4] VOICE TONE ANALYSIS", flush=True)
voice_probs, voice_emotion = voice_worker.get_probs()
voice_state = voice_worker.get_state()
voice_active = voice_state.get('speech_active', False)
voice_inferences = voice_state.get('inference_count', 0)
voice_skipped = voice_state.get('skipped_inferences', 0)
print(f" {'ACTIVELY PROCESSING' if voice_active else 'IDLE (no recent speech)'}", flush=True)
print(f" Result: {voice_emotion}", flush=True)
print(f" Distribution: Neutral={voice_probs[0]:.3f} | Happy={voice_probs[1]:.3f} | Sad={voice_probs[2]:.3f} | Angry={voice_probs[3]:.3f}", flush=True)
print(f" Inferences completed: {voice_inferences}", flush=True)
print(f" Skipped (silence optimization): {voice_skipped}", flush=True)
efficiency = (voice_inferences / (voice_inferences + voice_skipped) * 100) if (voice_inferences + voice_skipped) > 0 else 0
print(f" Processing efficiency: {efficiency:.1f}%", flush=True)
# Step 5: Analyze text sentiment
print(f"\n[Step 3/4] TEXT SENTIMENT ANALYSIS", flush=True)
print(f" Using Whisper transcription", flush=True)
text_analyzer.analyze(transcription)
text_probs, _ = text_analyzer.get_probs()
text_emotion = ['Neutral', 'Happy', 'Sad', 'Angry'][int(np.argmax(text_probs))]
print(f" Result: {text_emotion}", flush=True)
print(f" Distribution: Neutral={text_probs[0]:.3f} | Happy={text_probs[1]:.3f} | Sad={text_probs[2]:.3f} | Angry={text_probs[3]:.3f}", flush=True)
print(f" Text length: {len(transcription)} characters", flush=True)
# Step 6: Calculate fusion with quality-aware weights
print(f"\n[Step 4/4] MULTI-MODAL FUSION", flush=True)
base_weights = {
'face': fusion_engine.alpha_face,
'voice': fusion_engine.alpha_voice,
'text': fusion_engine.alpha_text
}
# Adjust weights based on quality/confidence
adjusted_weights = base_weights.copy()
adjustments_made = []
# Reduce face weight if quality is poor
if face_quality < 0.5:
adjusted_weights['face'] *= 0.7
adjustments_made.append(f"Face weight reduced (low quality: {face_quality:.3f})")
# Reduce voice weight if not active
if not voice_active:
adjusted_weights['voice'] *= 0.5
adjustments_made.append(f"Voice weight reduced (no recent speech)")
# Reduce text weight if very short
if len(transcription) < 10:
adjusted_weights['text'] *= 0.7
adjustments_made.append(f"Text weight reduced (short input: {len(transcription)} chars)")
# Normalize weights to sum to 1.0
total_weight = sum(adjusted_weights.values())
final_weights = {k: v/total_weight for k, v in adjusted_weights.items()}
print(f" Base weights: Face={base_weights['face']:.3f} | Voice={base_weights['voice']:.3f} | Text={base_weights['text']:.3f}", flush=True)
if adjustments_made:
print(f" Adjustments:", flush=True)
for adj in adjustments_made:
print(f" - {adj}", flush=True)
print(f" Final weights: Face={final_weights['face']:.3f} | Voice={final_weights['voice']:.3f} | Text={final_weights['text']:.3f}", flush=True)
# Calculate weighted fusion
fused_probs = (
final_weights['face'] * face_probs +
final_weights['voice'] * voice_probs +
final_weights['text'] * text_probs
)
fused_probs = fused_probs / (np.sum(fused_probs) + 1e-8)
fused_emotion, intensity = fusion_engine.fuse(face_probs, voice_probs, text_probs)
# Calculate fusion metrics
agreement_count = sum([
face_emotion == fused_emotion,
voice_emotion == fused_emotion,
text_emotion == fused_emotion
])
agreement_score = agreement_count / 3.0
all_same = (face_emotion == voice_emotion == text_emotion)
has_conflict = len({face_emotion, voice_emotion, text_emotion}) == 3
print(f"\n {'─'*76}", flush=True)
print(f" FUSION RESULTS:", flush=True)
print(f" {'─'*76}", flush=True)
print(f" Input emotions:", flush=True)
print(f" Face: {face_emotion:7s} (confidence={face_probs[EMOTION_MAP.get(face_emotion, 0)]:.3f}, weight={final_weights['face']:.3f})", flush=True)
print(f" Voice: {voice_emotion:7s} (confidence={voice_probs[EMOTION_MAP.get(voice_emotion, 0)]:.3f}, weight={final_weights['voice']:.3f})", flush=True)
print(f" Text: {text_emotion:7s} (confidence={text_probs[EMOTION_MAP.get(text_emotion, 0)]:.3f}, weight={final_weights['text']:.3f})", flush=True)
print(f" {'─'*76}", flush=True)
print(f" FUSED EMOTION: {fused_emotion}", flush=True)
print(f" Intensity: {intensity:.3f}", flush=True)
print(f" Fused distribution: Neutral={fused_probs[0]:.3f} | Happy={fused_probs[1]:.3f} | Sad={fused_probs[2]:.3f} | Angry={fused_probs[3]:.3f}", flush=True)
print(f" {'─'*76}", flush=True)
print(f" Agreement: {agreement_count}/3 modalities ({agreement_score*100:.1f}%)", flush=True)
if all_same:
print(f" Status: Perfect agreement - all modalities aligned", flush=True)
elif has_conflict:
print(f" Status: Full conflict - weighted fusion resolved disagreement", flush=True)
else:
print(f" Status: Partial agreement - majority vote with confidence weighting", flush=True)
print(f" {'─'*76}", flush=True)
# ========== LLM INPUT PREPARATION ==========
print(f"\n[LLM Input] Preparing context for language model...", flush=True)
user_language = user_preferences.get("language", "en")
context_prefix = ""
if user_summary:
context_prefix = f"[User context for {username}: {user_summary}]\n\n"
print(f"[LLM Input] - User context: YES ({len(user_summary)} chars)", flush=True)
else:
print(f"[LLM Input] - User context: NO (new user)", flush=True)
# Add language instruction
if user_language == "nl":
context_prefix += "[BELANGRIJK: Antwoord ALTIJD in het Nederlands!]\n\n"
print(f"[LLM Input] - Language: Dutch (Nederlands)", flush=True)
else:
context_prefix += "[IMPORTANT: ALWAYS respond in English!]\n\n"
print(f"[LLM Input] - Language: English", flush=True)
full_llm_input = context_prefix + transcription
print(f"[LLM Input] - Fused emotion: {fused_emotion}", flush=True)
print(f"[LLM Input] - Face emotion: {face_emotion}", flush=True)
print(f"[LLM Input] - Voice emotion: {voice_emotion}", flush=True)
print(f"[LLM Input] - Intensity: {intensity:.3f}", flush=True)
print(f"[LLM Input] - User text: '{transcription}'", flush=True)
print(f"[LLM Input] - Full prompt length: {len(full_llm_input)} chars", flush=True)
if len(context_prefix) > 50:
print(f"[LLM Input] - Context preview: '{context_prefix[:100]}...'", flush=True)
# Generate LLM response
print(f"\n[LLM] Generating response...", flush=True)
response_text = llm_generator.generate_response(
fused_emotion, face_emotion, voice_emotion,
full_llm_input, force=True, intensity=intensity
)
print(f"[LLM] Response generated: '{response_text}'", flush=True)
# Save assistant message
save_message(session_id, "assistant", response_text, fused_emotion)
# ========== SEND TO AVATAR FOR TTS ==========
print(f"\n[TTS] Sending to avatar backend...", flush=True)
try:
voice_preference = user_preferences.get("voice", "female")
language_preference = user_preferences.get("language", "en")
print(f"[TTS] - Voice: {voice_preference}", flush=True)
print(f"[TTS] - Language: {language_preference}", flush=True)
print(f"[TTS] - Text: '{response_text}'", flush=True)
avatar_response = requests.post(
f"{AVATAR_API}/speak",
data={
"text": response_text,
"voice": voice_preference,
"language": language_preference
},
timeout=45
)
avatar_response.raise_for_status()
avatar_data = avatar_response.json()
print(f"[TTS] Avatar TTS generated", flush=True)
print(f"[TTS] - Audio URL: {avatar_data.get('audio_url', 'N/A')}", flush=True)
print(f"[TTS] - Visemes: {len(avatar_data.get('visemes', []))} keyframes", flush=True)
await websocket.send_json({
"type": "llm_response",
"text": response_text,
"emotion": fused_emotion,
"intensity": intensity,
"audio_url": avatar_data.get("audio_url"),
"visemes": avatar_data.get("visemes")
})
print(f"[Pipeline] Complete response sent to {username}", flush=True)
except requests.exceptions.ConnectionError:
print(f"[TTS] Avatar service not available - sending text-only", flush=True)
await websocket.send_json({
"type": "llm_response",
"text": response_text,
"emotion": fused_emotion,
"intensity": intensity,
"text_only": True
})
except Exception as avatar_err:
print(f"[TTS] Avatar error: {avatar_err}", flush=True)
await websocket.send_json({
"type": "llm_response",
"text": response_text,
"emotion": fused_emotion,
"intensity": intensity,
"error": "Avatar TTS failed",
"text_only": True
})
print(f"{'='*80}\n", flush=True)
except Exception as e:
print(f"[Pipeline] Error in emotion processing: {e}", flush=True)
import traceback
traceback.print_exc()