Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import math | |
| import pandas as pd | |
| import hashlib | |
| import pickle | |
| from typing import Optional, Dict, Any, List | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from smolagents import Tool, CodeAgent | |
| from smolagents import DuckDuckGoSearchTool, VisitWebpageTool, InferenceClientModel | |
| # Load environment variables | |
| load_dotenv() | |
| # ------------------------- | |
| # Caching System | |
| # ------------------------- | |
| class CacheManager: | |
| def __init__(self, cache_dir: str = ".agent_cache"): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(exist_ok=True) | |
| def _hash_key(self, key: str) -> str: | |
| return hashlib.md5(key.encode()).hexdigest() | |
| def get(self, key: str) -> Optional[Any]: | |
| try: | |
| cache_file = self.cache_dir / f"{self._hash_key(key)}.cache" | |
| if cache_file.exists(): | |
| with open(cache_file, 'rb') as f: | |
| return pickle.load(f) | |
| except Exception: | |
| pass | |
| return None | |
| def set(self, key: str, value: Any) -> None: | |
| try: | |
| cache_file = self.cache_dir / f"{self._hash_key(key)}.cache" | |
| with open(cache_file, 'wb') as f: | |
| pickle.dump(value, f) | |
| except Exception: | |
| pass | |
| # ------------------------- | |
| # Optimized Custom Tools | |
| # ------------------------- | |
| class SmartCalculatorTool(Tool): | |
| name = "smart_calculator" | |
| description = "Perform mathematical calculations with step-by-step reasoning and memory" | |
| inputs = { | |
| "expression": {"type": "string", "description": "Math expression (e.g., '2 + 3 * 4', 'sqrt(16)')"}, | |
| "show_steps": {"type": "boolean", "description": "Whether to show calculation steps", "default": False, "nullable": True} | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.cache = CacheManager() | |
| def forward(self, expression: str, show_steps: bool = False) -> str: | |
| cache_key = f"calc:{expression}:{show_steps}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return f"[CACHED] {cached}" | |
| try: | |
| expr = expression.strip() | |
| steps = [] | |
| # Enhanced mathematical operations | |
| replacements = { | |
| 'sin': 'math.sin', 'cos': 'math.cos', 'tan': 'math.tan', | |
| 'log': 'math.log', 'ln': 'math.log', 'sqrt': 'math.sqrt', | |
| 'pi': 'math.pi', 'e': 'math.e', '^': '**', | |
| 'abs': 'abs', 'round': 'round', 'floor': 'math.floor', | |
| 'ceil': 'math.ceil', 'factorial': 'math.factorial', | |
| 'power': '**' | |
| } | |
| original_expr = expr | |
| for old, new in replacements.items(): | |
| if old in expr: | |
| expr = expr.replace(old, new) | |
| if old != new: | |
| steps.append(f"Replaced '{old}' with '{new}'") | |
| allowed_names = { | |
| 'math': math, 'abs': abs, 'min': min, 'max': max, | |
| 'round': round, 'sum': sum, 'pow': pow | |
| } | |
| # Safe evaluation | |
| result = eval(expr, {"__builtins__": {}}, allowed_names) | |
| if show_steps and steps: | |
| response = f"Steps:\n" + "\n".join(f"- {step}" for step in steps) | |
| response += f"\n\nCalculation: {original_expr} = {result}" | |
| else: | |
| response = str(result) | |
| self.cache.set(cache_key, response) | |
| return response | |
| except Exception as e: | |
| return f"Calculation error: {str(e)}" | |
| class ProgressivePDFTool(Tool): | |
| name = "progressive_pdf_reader" | |
| description = "Extract text from PDF with progressive loading and smart summarization" | |
| inputs = { | |
| "file_path": {"type": "string", "description": "Path to the PDF"}, | |
| "pages": {"type": "string", "description": "Pages to extract (e.g., '1-5' or 'all')", "default": "1-3", "nullable": True}, | |
| "summary": {"type": "boolean", "description": "Enable smart summarization", "default": True, "nullable": True} | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.cache = CacheManager() | |
| def forward(self, file_path: str, pages: str = "1-3", summary: bool = True) -> str: | |
| cache_key = f"pdf:{file_path}:{pages}:{summary}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return f"[CACHED] {cached[:200]}..." | |
| try: | |
| import PyPDF2 | |
| with open(file_path, 'rb') as f: | |
| reader = PyPDF2.PdfReader(f) | |
| total_pages = len(reader.pages) | |
| # Handle page selection | |
| if pages.lower() == "all": | |
| start_page, end_page = 0, total_pages | |
| elif "-" in pages: | |
| start, end = pages.split("-") | |
| start_page = max(0, int(start) - 1) | |
| end_page = min(total_pages, int(end)) | |
| else: | |
| start_page = max(0, int(pages) - 1) | |
| end_page = min(total_pages, start_page + 1) | |
| # Extract text progressively | |
| extracted_pages = [] | |
| for i in range(start_page, end_page): | |
| if i < total_pages: | |
| page = reader.pages[i] | |
| text = page.extract_text() or "" | |
| if text.strip(): | |
| extracted_pages.append(f"Page {i+1}: {text[:500]}...") # Limit per page | |
| result = f"PDF Summary ({end_page - start_page} pages extracted):\n" + "\n".join(extracted_pages) | |
| if summary and len(extracted_pages) > 3: | |
| result += f"\n\n[SMART SUMMARY: Document has {total_pages} total pages, showing first {end_page - start_page} pages with key content]" | |
| self.cache.set(cache_key, result) | |
| return result | |
| except Exception as e: | |
| return f"PDF reading error: {str(e)}" | |
| class EfficientCSVAnalyzerTool(Tool): | |
| name = "efficient_csv_analyzer" | |
| description = "Analyze CSV with smart sampling and statistical insights" | |
| inputs = { | |
| "file_path": {"type": "string", "description": "Path to the CSV"}, | |
| "analysis_type": {"type": "string", "description": "Type of analysis (basic/stats/sample/custom)", "default": "basic", "nullable": True}, | |
| "max_rows": {"type": "integer", "description": "Max rows to analyze for large files", "default": 1000, "nullable": True} | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.cache = CacheManager() | |
| def forward(self, file_path: str, analysis_type: str = "basic", max_rows: int = 1000) -> str: | |
| cache_key = f"csv:{file_path}:{analysis_type}:{max_rows}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return f"[CACHED] {cached}" | |
| try: | |
| # Smart sampling for large files | |
| df_sample = pd.read_csv(file_path, nrows=max_rows) | |
| total_rows = len(df_sample) | |
| # Check if file is larger than sample | |
| try: | |
| total_full = pd.read_csv(file_path, nrows=1) | |
| full_df = pd.read_csv(file_path) | |
| actual_total = len(full_df) | |
| df = df_sample | |
| except: | |
| actual_total = total_rows | |
| df = df_sample | |
| if analysis_type == "basic": | |
| result = f"CSV Analysis:\nShape: {df.shape}\nColumns: {list(df.columns)}\nSample data:\n{df.head().to_string()}" | |
| elif analysis_type == "stats": | |
| numeric_cols = df.select_dtypes(include=[float, int]).columns | |
| if len(numeric_cols) > 0: | |
| result = f"Statistical Analysis:\nShape: {df.shape}\nNumerical columns: {list(numeric_cols)}\n{df[numeric_cols].describe().to_string()}" | |
| else: | |
| result = f"No numerical columns found. Available columns: {list(df.columns)}" | |
| elif analysis_type == "sample": | |
| result = f"Sample Analysis (first {len(df)} rows of {actual_total} total):\nColumns: {list(df.columns)}\nSample:\n{df.head(10).to_string()}" | |
| else: | |
| result = f"Custom Analysis:\nShape: {df.shape}\nData types:\n{df.dtypes.to_string()}\nMissing values:\n{df.isnull().sum().to_string()}" | |
| if actual_total > max_rows: | |
| result += f"\n\n[NOTE: Analyzed sample of {max_rows} rows from {actual_total} total rows]" | |
| self.cache.set(cache_key, result) | |
| return result | |
| except Exception as e: | |
| return f"CSV analysis error: {str(e)}" | |
| class AdvancedReasoningTool(Tool): | |
| name = "advanced_reasoning" | |
| description = "Advanced reasoning with step-by-step logic and multiple reasoning strategies" | |
| inputs = { | |
| "question": {"type": "string", "description": "Question to reason about"}, | |
| "strategy": {"type": "string", "description": "Reasoning strategy (decomposition/analogical/critical)", "default": "decomposition", "nullable": True} | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.cache = CacheManager() | |
| def forward(self, question: str, strategy: str = "decomposition") -> str: | |
| cache_key = f"reason:{question}:{strategy}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return f"[CACHED] {cached}" | |
| try: | |
| if strategy == "decomposition": | |
| return self._decomposition_reasoning(question) | |
| elif strategy == "analogical": | |
| return self._analogical_reasoning(question) | |
| elif strategy == "critical": | |
| return self._critical_reasoning(question) | |
| else: | |
| return self._decomposition_reasoning(question) | |
| except Exception as e: | |
| return f"Reasoning error: {str(e)}" | |
| def _decomposition_reasoning(self, question: str) -> str: | |
| # Multi-step reasoning with structured approach | |
| sub_questions = [] | |
| # Pattern-based decomposition | |
| patterns = { | |
| 'and|also|additionally|furthermore': 'conjunction', | |
| 'because|since|due to': 'causal', | |
| 'if|then|when': 'conditional', | |
| 'what is|who is|where is': 'identification', | |
| 'why|how': 'explanatory' | |
| } | |
| detected_patterns = [] | |
| for pattern, type_name in patterns.items(): | |
| if re.search(pattern, question, re.IGNORECASE): | |
| detected_patterns.append(type_name) | |
| # Split compound questions | |
| if any(word in question.lower() for word in ['and', 'also', 'additionally']): | |
| parts = re.split(r'\band\b|\balso\b|\badditionally\b|\bfurthermore\b', question, flags=re.IGNORECASE) | |
| sub_questions = [p.strip() for p in parts if p.strip()] | |
| # Numbered questions | |
| numbered_pattern = r'(\d+)\.\s*(.+?)(?=\d+\.|$)' | |
| matches = re.findall(numbered_pattern, question) | |
| if matches: | |
| sub_questions = [m[1].strip() for m in matches] | |
| if sub_questions: | |
| result = f"Multi-part reasoning detected ({', '.join(detected_patterns)} patterns):\n" | |
| for i, sub_q in enumerate(sub_questions, 1): | |
| result += f"{i}. Analyze: {sub_q}\n" | |
| else: | |
| result = f"Single reasoning question with {', '.join(detected_patterns)} patterns. " | |
| result += f"Approach: Break down '{question}' into logical components and solve step by step." | |
| return result | |
| def _analogical_reasoning(self, question: str) -> str: | |
| return f"Analogical reasoning for: {question}\nApproach: Find similar patterns, cases, or principles that can be applied." | |
| def _critical_reasoning(self, question: str) -> str: | |
| return f"Critical reasoning for: {question}\nApproach: Examine assumptions, evaluate evidence, consider alternative perspectives." | |
| class FileSystemTool(Tool): | |
| name = "file_system_explorer" | |
| description = "Explore file system efficiently with smart filtering and file operations" | |
| inputs = { | |
| "path": {"type": "string", "description": "Directory path to explore"}, | |
| "pattern": {"type": "string", "description": "File pattern to match (e.g., '*.py', '*.txt')", "default": "*", "nullable": True}, | |
| "max_depth": {"type": "integer", "description": "Maximum directory depth", "default": 3, "nullable": True} | |
| } | |
| output_type = "string" | |
| def forward(self, path: str, pattern: str = "*", max_depth: int = 3) -> str: | |
| try: | |
| dir_path = Path(path) | |
| if not dir_path.exists(): | |
| return f"Path does not exist: {path}" | |
| files = [] | |
| try: | |
| for file_path in dir_path.rglob(pattern): | |
| if file_path.is_file(): | |
| relative_path = file_path.relative_to(dir_path) | |
| depth = len(relative_path.parts) - 1 | |
| if depth <= max_depth: | |
| size = file_path.stat().st_size | |
| files.append(f"{relative_path} ({size} bytes)") | |
| except Exception as e: | |
| return f"Error exploring {path}: {str(e)}" | |
| if files: | |
| return f"File system exploration:\nPath: {path}\nPattern: {pattern}\nMax depth: {max_depth}\nFiles found:\n" + "\n".join(files[:20]) | |
| else: | |
| return f"No files found matching pattern '{pattern}' in {path}" | |
| except Exception as e: | |
| return f"File system error: {str(e)}" | |
| class WebAPIConnectorTool(Tool): | |
| name = "web_api_connector" | |
| description = "Connect to various APIs for data retrieval and processing" | |
| inputs = { | |
| "api_type": {"type": "string", "description": "API type (weather, currency, news, stock)"}, | |
| "query": {"type": "string", "description": "API query/parameters"}, | |
| "format": {"type": "string", "description": "Output format (summary/json)", "default": "summary", "nullable": True} | |
| } | |
| output_type = "string" | |
| def forward(self, api_type: str, query: str, format: str = "summary") -> str: | |
| try: | |
| import requests | |
| import json | |
| # Mock API responses for common services | |
| if api_type.lower() == "weather": | |
| # This would typically use a real weather API | |
| return f"Weather data for {query}: [Mock data] Temperature, humidity, conditions available via weather API" | |
| elif api_type.lower() == "currency": | |
| return f"Currency data for {query}: [Mock data] Exchange rates, trends available via currency API" | |
| elif api_type.lower() == "news": | |
| return f"News data for {query}: [Mock data] Recent headlines, articles available via news API" | |
| elif api_type.lower() == "stock": | |
| return f"Stock data for {query}: [Mock data] Prices, market data available via stock API" | |
| else: | |
| return f"API type '{api_type}' not supported. Available: weather, currency, news, stock" | |
| except Exception as e: | |
| return f"API connection error: {str(e)}" | |
| class WikipediaTool(Tool): | |
| name = "wikipedia_search" | |
| description = "Search Wikipedia for information on any topic" | |
| inputs = { | |
| "query": {"type": "string", "description": "Topic to search on Wikipedia"}, | |
| "num_sentences": {"type": "integer", "description": "Number of sentences to return", "default": 3, "nullable": True} | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.cache = CacheManager() | |
| def forward(self, query: str, num_sentences: int = 3) -> str: | |
| cache_key = f"wiki:{query}:{num_sentences}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return f"[CACHED] {cached}" | |
| try: | |
| import wikipedia | |
| summary = wikipedia.summary(query, sentences=num_sentences) | |
| result = summary | |
| self.cache.set(cache_key, result) | |
| return result | |
| except wikipedia.exceptions.DisambiguationError as e: | |
| options = e.options[:3] | |
| return f"Multiple options found: {', '.join(options)}. Please be more specific." | |
| except Exception as e: | |
| return f"Wikipedia search error: {str(e)}" | |
| class ImageProcessorTool(Tool): | |
| name = "image_processor" | |
| description = "Process and analyze images with basic operations" | |
| inputs = { | |
| "image_path": {"type": "string", "description": "Path to the image file"}, | |
| "operation": {"type": "string", "description": "Operation (analyze/resize/info)", "default": "analyze", "nullable": True} | |
| } | |
| output_type = "string" | |
| def forward(self, image_path: str, operation: str = "analyze") -> str: | |
| try: | |
| from PIL import Image | |
| import os | |
| if not os.path.exists(image_path): | |
| return f"Image file not found: {image_path}" | |
| img = Image.open(image_path) | |
| if operation == "info": | |
| return f"Image info: {img.size} pixels, mode: {img.mode}, format: {img.format}" | |
| elif operation == "resize": | |
| # Just return info about what resize would do | |
| return f"Resize operation for {image_path}: Current size {img.size}. Would resize to specified dimensions." | |
| else: | |
| return f"Image analysis for {image_path}:\nSize: {img.size}\nMode: {img.mode}\nFormat: {img.format}\nBasic image properties extracted." | |
| except Exception as e: | |
| return f"Image processing error: {str(e)}" | |
| # ------------------------- | |
| # Intelligent Tool Router | |
| # ------------------------- | |
| class IntelligentToolRouter: | |
| def __init__(self, tools: List[Tool]): | |
| self.tools = {tool.name: tool for tool in tools} | |
| self.keywords = { | |
| 'math': ['calculate', 'compute', 'solve', 'equation', 'formula', 'sum', 'product', 'area', 'radius', 'sqrt', '+', '-', '*', '/'], | |
| 'pdf': ['pdf', 'document', 'read pdf'], | |
| 'csv': ['csv', 'excel', 'spreadsheet', 'data analysis'], | |
| 'reasoning': ['why', 'explain', 'reason', 'logic', 'conclusion', 'infer', 'deduce', 'think through'], | |
| 'files': ['file', 'directory', 'folder', 'explore', 'list'], | |
| 'api': ['api', 'weather', 'currency', 'news', 'stock', 'data service'], | |
| 'image': ['image', 'picture', 'photo', 'visual', 'picture'], | |
| 'wikipedia': ['wikipedia', 'who is', 'what is', 'where is', 'history', 'biography'] | |
| } | |
| def get_best_tool(self, query: str) -> Optional[str]: | |
| query_lower = query.lower() | |
| scores = {} | |
| for tool_category, keywords in self.keywords.items(): | |
| score = sum(1 for keyword in keywords if keyword in query_lower) | |
| if score > 0: | |
| # Map categories to actual tools | |
| tool_mapping = { | |
| 'math': 'smart_calculator', | |
| 'pdf': 'progressive_pdf_reader', | |
| 'csv': 'efficient_csv_analyzer', | |
| 'reasoning': 'advanced_reasoning', | |
| 'files': 'file_system_explorer', | |
| 'api': 'web_api_connector', | |
| 'image': 'image_processor', | |
| 'wikipedia': 'wikipedia_search' | |
| } | |
| if tool_category in tool_mapping: | |
| scores[tool_mapping[tool_category]] = score | |
| if scores: | |
| return max(scores, key=scores.get) | |
| return None | |
| def suggest_tools(self, query: str) -> List[str]: | |
| query_lower = query.lower() | |
| suggestions = [] | |
| # Check for mathematical expressions | |
| if re.search(r'\d+[\s\+\-\*\/x\^]', query_lower) or any(word in query_lower for word in ['calculate', 'solve']): | |
| suggestions.append('smart_calculator') | |
| # Check for file operations | |
| if any(word in query_lower for word in ['file', 'pdf', 'csv', 'document']): | |
| if 'pdf' in query_lower: | |
| suggestions.append('progressive_pdf_reader') | |
| if 'csv' in query_lower: | |
| suggestions.append('efficient_csv_analyzer') | |
| if 'file' in query_lower: | |
| suggestions.append('file_system_explorer') | |
| # Check for Wikipedia queries | |
| if any(word in query_lower for word in ['who is', 'what is', 'where is', 'wikipedia', 'history', 'biography']): | |
| suggestions.append('wikipedia_search') | |
| return suggestions | |
| # ------------------------- | |
| # Optimized Smolagents GAIA Agent | |
| # ------------------------- | |
| class OptimizedSmolagentsGAIAgent: | |
| def __init__(self): | |
| # Initialize optimized tools | |
| self.calculator = SmartCalculatorTool() | |
| self.pdf_reader = ProgressivePDFTool() | |
| self.csv_analyzer = EfficientCSVAnalyzerTool() | |
| self.reasoning = AdvancedReasoningTool() | |
| self.file_system = FileSystemTool() | |
| self.web_api = WebAPIConnectorTool() | |
| self.image_processor = ImageProcessorTool() | |
| self.wikipedia = WikipediaTool() | |
| self.web_search = DuckDuckGoSearchTool() | |
| self.visit_webpage = VisitWebpageTool() | |
| self.tools = [ | |
| self.calculator, | |
| self.pdf_reader, | |
| self.csv_analyzer, | |
| self.reasoning, | |
| self.file_system, | |
| self.web_api, | |
| self.image_processor, | |
| self.wikipedia, | |
| self.web_search, | |
| self.visit_webpage | |
| ] | |
| # Initialize intelligent router | |
| self.tool_router = IntelligentToolRouter(self.tools) | |
| # Initialize Hugging Face model | |
| self.model = self._initialize_model() | |
| # Initialize agent with optimizations | |
| self.agent = CodeAgent( | |
| tools=self.tools, | |
| model=self.model, | |
| max_steps=8, # Reduced from 10 for efficiency | |
| verbosity_level=0 # Reduced verbosity for token efficiency | |
| ) | |
| # Initialize cache manager | |
| self.cache = CacheManager() | |
| def _initialize_model(self): | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| print("HF_TOKEN not found. Please set it in environment variables") | |
| return None | |
| try: | |
| from smolagents import InferenceClientModel | |
| model = InferenceClientModel( | |
| model_id="allenai/Olmo-3-32B-Think" | |
| token=hf_token | |
| ) | |
| print("Using HuggingFace model") | |
| return model | |
| except Exception as e: | |
| print(f"Error initializing HuggingFace model: {e}") | |
| return None | |
| def classify_question(self, question: str) -> Dict[str, Any]: | |
| """Enhanced question classification with confidence scores""" | |
| q_lower = question.lower() | |
| # Check for direct tool suggestions | |
| suggested_tools = self.tool_router.suggest_tools(question) | |
| # Mathematical detection | |
| math_score = 0 | |
| math_keywords = ['calculate', 'compute', 'solve', 'equation', 'formula', 'sum', 'product', 'area', 'radius', 'sqrt'] | |
| for keyword in math_keywords: | |
| if keyword in q_lower: | |
| math_score += 1 | |
| # Check for math expressions | |
| if re.search(r'\d+[\s\+\-\*\/x\^]', q_lower): | |
| math_score += 2 | |
| # File processing detection | |
| file_score = 0 | |
| if 'pdf' in q_lower: | |
| file_score += 2 | |
| if 'csv' in q_lower: | |
| file_score += 2 | |
| if any(word in q_lower for word in ['file', 'document', 'spreadsheet']): | |
| file_score += 1 | |
| # Reasoning detection | |
| reasoning_score = 0 | |
| reasoning_keywords = ['why', 'explain', 'reason', 'logic', 'conclusion', 'infer', 'deduce', 'think'] | |
| for keyword in reasoning_keywords: | |
| if keyword in q_lower: | |
| reasoning_score += 1 | |
| return { | |
| 'mathematical': math_score, | |
| 'file_processing': file_score, | |
| 'reasoning': reasoning_score, | |
| 'suggested_tools': suggested_tools, | |
| 'primary_type': max(['mathematical', 'file_processing', 'reasoning'], | |
| key=lambda x: locals()[x.split('_')[0]] if x.split('_')[0] in locals() else 0) | |
| } | |
| def process_question(self, question: str) -> str: | |
| """Enhanced question processing with intelligent routing""" | |
| if not self.model: | |
| return "Error: No language model available. Set HF_TOKEN." | |
| # Check cache first | |
| cache_key = f"question:{question}" | |
| cached_result = self.cache.get(cache_key) | |
| if cached_result: | |
| return f"[CACHED] {cached_result}" | |
| try: | |
| classification = self.classify_question(question) | |
| # Create optimized prompt based on classification | |
| if classification['mathematical'] > 0: | |
| prompt = f"Solve this mathematical problem efficiently: {question}\nUse the smart_calculator tool for precise calculations." | |
| elif classification['file_processing'] > 0: | |
| if 'pdf' in question.lower(): | |
| prompt = f"Process this PDF-related question efficiently: {question}\nUse progressive_pdf_reader for optimal results." | |
| elif 'csv' in question.lower(): | |
| prompt = f"Process this CSV-related question efficiently: {question}\nUse efficient_csv_analyzer for smart analysis." | |
| else: | |
| prompt = f"Process this file-related question efficiently: {question}" | |
| elif 'wikipedia' in classification.get('suggested_tools', []) or any(word in question.lower() for word in ['who is', 'what is', 'where is', 'wikipedia']): | |
| prompt = f"Search Wikipedia for information: {question}\nUse wikipedia_search for accurate information." | |
| elif classification['reasoning'] > 0: | |
| prompt = f"Provide structured reasoning for this question: {question}\nUse advanced_reasoning for step-by-step analysis." | |
| else: | |
| prompt = f"Find accurate information efficiently for this question: {question}" | |
| # Add tool suggestions to prompt | |
| if classification['suggested_tools']: | |
| prompt += f"\nSuggested tools: {', '.join(classification['suggested_tools'])}" | |
| # Add efficiency note | |
| prompt += "\n[OPTIMIZED: Process efficiently with minimal token usage]" | |
| result = self.agent.run(prompt) | |
| result_str = str(result) | |
| # Format response for HuggingFace compatibility | |
| # Wrap any code blocks in <code> tags as expected by the parsing system | |
| if "```" in result_str or "def " in result_str or "import " in result_str: | |
| # Extract code blocks and wrap them properly | |
| import re | |
| # Find Python code blocks (either fenced with ```python or ``` or plain code) | |
| code_pattern = r'```(?:python)?\n(.*?)\n```|```\n(.*?)\n```|([a-zA-Z_][a-zA-Z0-9_]*\s*=\s*.*|def\s+\w+.*|import\s+\w+.*|from\s+\w+.*import.*)' | |
| matches = re.findall(code_pattern, result_str, re.DOTALL) | |
| if matches: | |
| formatted_result = result_str | |
| for match in matches: | |
| code_content = match[0] if match[0] else match[1] if match[1] else match[2] | |
| if code_content: | |
| # Wrap in <code> tags as expected by HuggingFace parser | |
| wrapped_code = f"<code>{code_content.strip()}</code>" | |
| formatted_result = formatted_result.replace(code_content.strip(), wrapped_code) | |
| result_str = formatted_result | |
| else: | |
| # If no code blocks found but response contains code-like patterns, wrap entire response | |
| if any(keyword in result_str.lower() for keyword in ['def ', 'import ', 'return ', 'print(', '# ']): | |
| result_str = f"<code>{result_str}</code>" | |
| # Cache the result | |
| self.cache.set(cache_key, result_str) | |
| return result_str | |
| except Exception as e: | |
| return f"Agent processing error: {str(e)}" | |
| def get_tool_recommendations(self, question: str) -> List[str]: | |
| """Get tool recommendations for a given question""" | |
| return self.tool_router.suggest_tools(question) | |
| def clear_cache(self): | |
| """Clear the agent cache""" | |
| import shutil | |
| cache_dir = Path(".agent_cache") | |
| if cache_dir.exists(): | |
| shutil.rmtree(cache_dir) | |
| print("Cache cleared successfully") | |
| # ------------------------- | |
| # Test the optimized agent | |
| # ------------------------- | |
| if __name__ == "__main__": | |
| agent = OptimizedSmolagentsGAIAgent() | |
| test_questions = [ | |
| "What is the capital of France?", | |
| "Calculate 15 + 27 * 3", | |
| "Who wrote Romeo and Juliet?", | |
| "What is the square root of 144?", | |
| "Explain why the sky is blue", | |
| "Analyze the sales data in sales.csv", | |
| "Read the first 3 pages of document.pdf", | |
| "Explore the current directory for Python files" | |
| ] | |
| print("=== OPTIMIZED SMOLAGENTS AGENT TEST ===\n") | |
| for i, question in enumerate(test_questions, 1): | |
| print(f"Test {i}: {question}") | |
| recommendations = agent.get_tool_recommendations(question) | |
| if recommendations: | |
| print(f"Tool recommendations: {recommendations}") | |
| answer = agent.process_question(question) | |
| print(f"Answer: {answer[:200]}...") | |
| print("-" * 50) | |
| print("\nCache management:") | |
| print(f"Cache directory: {Path('.agent_cache')}") | |
| print("Use agent.clear_cache() to clear cache if needed") |