Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced OpenRouter SAAP Agent - Cost-Efficient Models | |
| OpenAI Models via OpenRouter with role-specific assignment and cost tracking | |
| Author: Hanan Wandji Danga | |
| """ | |
| import os | |
| from dotenv import load_dotenv | |
| import aiohttp | |
| import json | |
| import time | |
| import asyncio | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| # Load environment variables | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| class EnhancedOpenRouterAgent: | |
| """ | |
| Enhanced OpenRouter Agent with cost-efficient model selection | |
| Optimized for performance and cost tracking | |
| """ | |
| def __init__(self, agent_name: str, role: str, api_key: str): | |
| self.agent_name = agent_name | |
| self.role = role | |
| self.api_key = api_key | |
| self.base_url = "https://openrouter.ai/api/v1" | |
| # Cost-Efficient Model Assignment by Role | |
| self.role_model_mapping = { | |
| "Coordinator": { | |
| "model": "openai/gpt-4o-mini", # $0.15/1M tokens - Fast coordination | |
| "max_tokens": 800, | |
| "temperature": 0.7 | |
| }, | |
| "Developer": { | |
| "model": "anthropic/claude-3-haiku", # $0.25/1M tokens - Code expertise | |
| "max_tokens": 1200, | |
| "temperature": 0.5 | |
| }, | |
| "Medical": { | |
| "model": "openai/gpt-4o-mini", # $0.15/1M tokens - Accurate but cost-efficient | |
| "max_tokens": 1000, | |
| "temperature": 0.3 | |
| }, | |
| "Legal": { | |
| "model": "openai/gpt-4o-mini", # $0.15/1M tokens - Precise legal analysis | |
| "max_tokens": 1000, | |
| "temperature": 0.3 | |
| }, | |
| "Analyst": { | |
| "model": "meta-llama/llama-3.2-3b-instruct:free", # FREE - Data analysis | |
| "max_tokens": 600, | |
| "temperature": 0.6 | |
| }, | |
| "Fallback": { | |
| "model": "meta-llama/llama-3.2-3b-instruct:free", # FREE - Backup | |
| "max_tokens": 400, | |
| "temperature": 0.7 | |
| } | |
| } | |
| # Model cost tracking (cost per 1M tokens) | |
| self.model_costs = { | |
| "openai/gpt-4o-mini": 0.15, | |
| "anthropic/claude-3-haiku": 0.25, | |
| "meta-llama/llama-3.2-3b-instruct:free": 0.0, | |
| "openai/gpt-3.5-turbo": 0.50, | |
| "mistral/mistral-7b-instruct:free": 0.0 | |
| } | |
| # Get model config for this role | |
| self.model_config = self.role_model_mapping.get(role, self.role_model_mapping["Fallback"]) | |
| self.model_name = self.model_config["model"] | |
| # Agent Context | |
| self.context = self._initialize_context() | |
| logger.info(f"🌐 {agent_name} ({role}) initialized with {self.model_name} (${self.model_costs.get(self.model_name, 0)}/1M tokens)") | |
| def _initialize_context(self) -> str: | |
| """Role-specific context for optimal performance""" | |
| contexts = { | |
| "Coordinator": """Du bist Jane Alesi, die leitende KI-Architektin von SAAP. Du koordinierst Multi-Agent-Systeme und hilfst bei: | |
| - Agent-Orchestrierung und Workflow-Management | |
| - Technische Architektur-Entscheidungen | |
| - Team-Koordination zwischen Entwicklern und Spezialisten | |
| - Performance-Optimierung von Agent-Communications | |
| Antworte präzise und fokussiert auf Koordinations-Aufgaben.""", | |
| "Developer": """Du bist John Alesi, ein fortgeschrittener Softwareentwickler für AGI-Systeme. Du spezialisierst dich auf: | |
| - Python/Node.js Backend-Entwicklung | |
| - FastAPI und Database-Integration | |
| - Agent Communication Protocols | |
| - Code-Optimierung und Debugging | |
| Antworte mit konkreten, implementierbaren Lösungen.""", | |
| "Medical": """Du bist Lara Alesi, medizinische AI-Expertin. Du hilfst bei: | |
| - Medizinischen Fachfragen und Diagnose-Unterstützung | |
| - Healthcare-Compliance und Standards | |
| - Medizinische Datenanalyse | |
| - Gesundheitswesen-spezifische AI-Anwendungen | |
| Antworte wissenschaftlich fundiert und präzise.""", | |
| "Legal": """Du bist Justus Alesi, Rechtsexperte für Deutschland, Schweiz und EU. Du hilfst bei: | |
| - DSGVO-Compliance und Datenschutz | |
| - Rechtliche Bewertung von AI-Systemen | |
| - Vertragsrecht und Licensing | |
| - Regulatorische Anforderungen | |
| Antworte rechtlich fundiert und vorsichtig.""", | |
| "Analyst": """Du bist ein SAAP Analyst Agent. Du spezialisierst dich auf: | |
| - Datenanalyse und Performance-Metriken | |
| - System-Monitoring und Optimierungspotentiale | |
| - Requirements Engineering und Use Case Analysis | |
| - Benchmarking und Vergleichsstudien | |
| Antworte datengetrieben und analytisch.""" | |
| } | |
| return contexts.get(self.role, contexts["Analyst"]) | |
| async def send_request(self, prompt: str, track_costs: bool = True) -> Dict[str, Any]: | |
| """ | |
| Send request to OpenRouter with enhanced cost tracking | |
| """ | |
| start_time = time.time() | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.api_key}", | |
| "HTTP-Referer": "https://saap.satware.com", # Optional for tracking | |
| "X-Title": f"SAAP {self.role} Agent" # For OpenRouter dashboard | |
| } | |
| payload = { | |
| "model": self.model_name, | |
| "messages": [ | |
| {"role": "system", "content": self.context}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "max_tokens": self.model_config["max_tokens"], | |
| "temperature": self.model_config["temperature"], | |
| "top_p": 1, | |
| "frequency_penalty": 0, | |
| "presence_penalty": 0 | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| f"{self.base_url}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=aiohttp.ClientTimeout(total=45) | |
| ) as response: | |
| response_time = time.time() - start_time | |
| if response.status == 200: | |
| data = await response.json() | |
| response_text = data['choices'][0]['message']['content'] | |
| usage = data.get('usage', {}) | |
| # Enhanced cost calculation | |
| total_tokens = usage.get('total_tokens', 0) | |
| prompt_tokens = usage.get('prompt_tokens', 0) | |
| completion_tokens = usage.get('completion_tokens', 0) | |
| # Calculate cost | |
| cost_per_1m_tokens = self.model_costs.get(self.model_name, 0) | |
| estimated_cost = (total_tokens / 1_000_000) * cost_per_1m_tokens | |
| # Performance metrics | |
| tokens_per_second = total_tokens / response_time if response_time > 0 else 0 | |
| cost_per_second = estimated_cost / response_time if response_time > 0 else 0 | |
| result = { | |
| "success": True, | |
| "response": response_text, | |
| "performance_metrics": { | |
| "response_time": round(response_time, 3), | |
| "tokens_per_second": round(tokens_per_second, 2), | |
| "cost_per_second": round(cost_per_second, 6) | |
| }, | |
| "usage_metrics": { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": completion_tokens, | |
| "total_tokens": total_tokens | |
| }, | |
| "cost_metrics": { | |
| "estimated_cost_usd": round(estimated_cost, 6), | |
| "cost_per_1m_tokens": cost_per_1m_tokens, | |
| "model_name": self.model_name, | |
| "is_free_model": cost_per_1m_tokens == 0 | |
| }, | |
| "agent_info": { | |
| "agent_name": self.agent_name, | |
| "role": self.role, | |
| "provider": "OpenRouter" | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| if track_costs: | |
| logger.info( | |
| f"💰 Cost Efficiency - {self.agent_name}: " | |
| f"{response_time:.2f}s, {total_tokens} tokens, " | |
| f"${estimated_cost:.6f} ({self.model_name})" | |
| ) | |
| return result | |
| elif response.status == 429: | |
| # Rate limit - try cheaper model | |
| logger.warning(f"⚠️ Rate limit hit for {self.model_name}, switching to free model") | |
| return await self._fallback_to_free_model(prompt, track_costs) | |
| else: | |
| error_text = await response.text() | |
| error_result = { | |
| "success": False, | |
| "error": f"HTTP {response.status}: {error_text}", | |
| "response_time": round(response_time, 3), | |
| "model": self.model_name, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| return error_result | |
| except asyncio.TimeoutError: | |
| error_result = { | |
| "success": False, | |
| "error": "Request timeout (45s)", | |
| "response_time": 45.0, | |
| "model": self.model_name, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| logger.error(f"⏰ Timeout for {self.agent_name}") | |
| return error_result | |
| except Exception as e: | |
| error_result = { | |
| "success": False, | |
| "error": f"Request failed: {str(e)}", | |
| "response_time": round(time.time() - start_time, 3), | |
| "model": self.model_name, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| logger.error(f"❌ OpenRouter Error for {self.agent_name}: {e}") | |
| return error_result | |
| async def _fallback_to_free_model(self, prompt: str, track_costs: bool) -> Dict[str, Any]: | |
| """Fallback to free model when rate limited""" | |
| original_model = self.model_name | |
| self.model_name = "meta-llama/llama-3.2-3b-instruct:free" | |
| logger.info(f"🔄 Fallback: {original_model} → {self.model_name}") | |
| result = await self.send_request(prompt, track_costs) | |
| # Restore original model for next request | |
| self.model_name = original_model | |
| if result["success"]: | |
| result["cost_metrics"]["fallback_used"] = True | |
| result["cost_metrics"]["original_model"] = original_model | |
| return result | |
| async def health_check(self) -> Dict[str, Any]: | |
| """Health check with cost efficiency metrics""" | |
| try: | |
| test_prompt = "Reply with just 'OK' to confirm SAAP agent connectivity." | |
| result = await self.send_request(test_prompt, track_costs=False) | |
| return { | |
| "agent_name": self.agent_name, | |
| "role": self.role, | |
| "provider": "OpenRouter", | |
| "model": self.model_name, | |
| "status": "healthy" if result["success"] else "unhealthy", | |
| "response_time": result.get("performance_metrics", {}).get("response_time", 0), | |
| "cost_per_1m_tokens": self.model_costs.get(self.model_name, 0), | |
| "is_free_model": self.model_costs.get(self.model_name, 0) == 0, | |
| "error": result.get("error") if not result["success"] else None, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "agent_name": self.agent_name, | |
| "role": self.role, | |
| "provider": "OpenRouter", | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Utility functions for SAAP integration | |
| def create_agent_by_role(role: str, agent_name: str, api_key: str) -> EnhancedOpenRouterAgent: | |
| """Create optimized OpenRouter agent by role""" | |
| return EnhancedOpenRouterAgent(agent_name, role, api_key) | |
| def get_cost_efficient_model_for_role(role: str) -> Dict[str, Any]: | |
| """Get the most cost-efficient model recommendation for a role""" | |
| mapping = EnhancedOpenRouterAgent("temp", role, "temp").role_model_mapping | |
| return mapping.get(role, mapping["Fallback"]) | |
| if __name__ == "__main__": | |
| async def demo_cost_efficient_agents(): | |
| """Demo cost-efficient agents with tracking""" | |
| print("💰 OpenRouter Cost-Efficient Models Demo") | |
| print("=" * 50) | |
| # Load API key from environment variable | |
| API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| if not API_KEY: | |
| print("❌ Error: OPENROUTER_API_KEY not set in environment variables") | |
| print("Please set it in backend/.env file:") | |
| print("OPENROUTER_API_KEY=sk-or-v1-your-actual-key-here") | |
| return | |
| # Create agents for different roles | |
| agents = [ | |
| EnhancedOpenRouterAgent("jane_alesi", "Coordinator", API_KEY), | |
| EnhancedOpenRouterAgent("john_alesi", "Developer", API_KEY), | |
| EnhancedOpenRouterAgent("lara_alesi", "Medical", API_KEY), | |
| EnhancedOpenRouterAgent("analyst_agent", "Analyst", API_KEY) | |
| ] | |
| test_prompt = "Erkläre in 2 Sätzen die Hauptvorteile deiner Rolle in einem Multi-Agent-System." | |
| total_cost = 0 | |
| total_time = 0 | |
| for agent in agents: | |
| print(f"\n🤖 Testing {agent.agent_name} ({agent.role})...") | |
| print(f" Model: {agent.model_name}") | |
| result = await agent.send_request(test_prompt) | |
| if result["success"]: | |
| metrics = result["performance_metrics"] | |
| cost = result["cost_metrics"]["estimated_cost_usd"] | |
| print(f" ✅ Response: {result['response'][:80]}...") | |
| print(f" ⏱️ Time: {metrics['response_time']}s") | |
| print(f" 💰 Cost: ${cost:.6f}") | |
| print(f" 🔥 Speed: {metrics['tokens_per_second']:.1f} tokens/s") | |
| total_cost += cost | |
| total_time += metrics['response_time'] | |
| else: | |
| print(f" ❌ Error: {result['error']}") | |
| print(f"\n📊 Total Performance:") | |
| print(f" 💰 Total Cost: ${total_cost:.6f}") | |
| print(f" ⏱️ Total Time: {total_time:.2f}s") | |
| print(f" 💡 Average Cost per Agent: ${total_cost/len(agents):.6f}") | |
| asyncio.run(demo_cost_efficient_agents()) | |