huggingface_ai_final / smolagents_agent.py
alfulanny's picture
Update smolagents_agent.py
ec3aa0f verified
raw
history blame
31 kB
import os
import re
import math
import pandas as pd
import hashlib
import pickle
from typing import Optional, Dict, Any, List
from pathlib import Path
from dotenv import load_dotenv
from smolagents import Tool, CodeAgent
from smolagents import DuckDuckGoSearchTool, VisitWebpageTool, InferenceClientModel
# Load environment variables
load_dotenv()
# -------------------------
# Caching System
# -------------------------
class CacheManager:
def __init__(self, cache_dir: str = ".agent_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _hash_key(self, key: str) -> str:
return hashlib.md5(key.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
try:
cache_file = self.cache_dir / f"{self._hash_key(key)}.cache"
if cache_file.exists():
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception:
pass
return None
def set(self, key: str, value: Any) -> None:
try:
cache_file = self.cache_dir / f"{self._hash_key(key)}.cache"
with open(cache_file, 'wb') as f:
pickle.dump(value, f)
except Exception:
pass
# -------------------------
# Optimized Custom Tools
# -------------------------
class SmartCalculatorTool(Tool):
name = "smart_calculator"
description = "Perform mathematical calculations with step-by-step reasoning and memory"
inputs = {
"expression": {"type": "string", "description": "Math expression (e.g., '2 + 3 * 4', 'sqrt(16)')"},
"show_steps": {"type": "boolean", "description": "Whether to show calculation steps", "default": False, "nullable": True}
}
output_type = "string"
def __init__(self):
super().__init__()
self.cache = CacheManager()
def forward(self, expression: str, show_steps: bool = False) -> str:
cache_key = f"calc:{expression}:{show_steps}"
cached = self.cache.get(cache_key)
if cached:
return f"[CACHED] {cached}"
try:
expr = expression.strip()
steps = []
# Enhanced mathematical operations
replacements = {
'sin': 'math.sin', 'cos': 'math.cos', 'tan': 'math.tan',
'log': 'math.log', 'ln': 'math.log', 'sqrt': 'math.sqrt',
'pi': 'math.pi', 'e': 'math.e', '^': '**',
'abs': 'abs', 'round': 'round', 'floor': 'math.floor',
'ceil': 'math.ceil', 'factorial': 'math.factorial',
'power': '**'
}
original_expr = expr
for old, new in replacements.items():
if old in expr:
expr = expr.replace(old, new)
if old != new:
steps.append(f"Replaced '{old}' with '{new}'")
allowed_names = {
'math': math, 'abs': abs, 'min': min, 'max': max,
'round': round, 'sum': sum, 'pow': pow
}
# Safe evaluation
result = eval(expr, {"__builtins__": {}}, allowed_names)
if show_steps and steps:
response = f"Steps:\n" + "\n".join(f"- {step}" for step in steps)
response += f"\n\nCalculation: {original_expr} = {result}"
else:
response = str(result)
self.cache.set(cache_key, response)
return response
except Exception as e:
return f"Calculation error: {str(e)}"
class ProgressivePDFTool(Tool):
name = "progressive_pdf_reader"
description = "Extract text from PDF with progressive loading and smart summarization"
inputs = {
"file_path": {"type": "string", "description": "Path to the PDF"},
"pages": {"type": "string", "description": "Pages to extract (e.g., '1-5' or 'all')", "default": "1-3", "nullable": True},
"summary": {"type": "boolean", "description": "Enable smart summarization", "default": True, "nullable": True}
}
output_type = "string"
def __init__(self):
super().__init__()
self.cache = CacheManager()
def forward(self, file_path: str, pages: str = "1-3", summary: bool = True) -> str:
cache_key = f"pdf:{file_path}:{pages}:{summary}"
cached = self.cache.get(cache_key)
if cached:
return f"[CACHED] {cached[:200]}..."
try:
import PyPDF2
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
total_pages = len(reader.pages)
# Handle page selection
if pages.lower() == "all":
start_page, end_page = 0, total_pages
elif "-" in pages:
start, end = pages.split("-")
start_page = max(0, int(start) - 1)
end_page = min(total_pages, int(end))
else:
start_page = max(0, int(pages) - 1)
end_page = min(total_pages, start_page + 1)
# Extract text progressively
extracted_pages = []
for i in range(start_page, end_page):
if i < total_pages:
page = reader.pages[i]
text = page.extract_text() or ""
if text.strip():
extracted_pages.append(f"Page {i+1}: {text[:500]}...") # Limit per page
result = f"PDF Summary ({end_page - start_page} pages extracted):\n" + "\n".join(extracted_pages)
if summary and len(extracted_pages) > 3:
result += f"\n\n[SMART SUMMARY: Document has {total_pages} total pages, showing first {end_page - start_page} pages with key content]"
self.cache.set(cache_key, result)
return result
except Exception as e:
return f"PDF reading error: {str(e)}"
class EfficientCSVAnalyzerTool(Tool):
name = "efficient_csv_analyzer"
description = "Analyze CSV with smart sampling and statistical insights"
inputs = {
"file_path": {"type": "string", "description": "Path to the CSV"},
"analysis_type": {"type": "string", "description": "Type of analysis (basic/stats/sample/custom)", "default": "basic", "nullable": True},
"max_rows": {"type": "integer", "description": "Max rows to analyze for large files", "default": 1000, "nullable": True}
}
output_type = "string"
def __init__(self):
super().__init__()
self.cache = CacheManager()
def forward(self, file_path: str, analysis_type: str = "basic", max_rows: int = 1000) -> str:
cache_key = f"csv:{file_path}:{analysis_type}:{max_rows}"
cached = self.cache.get(cache_key)
if cached:
return f"[CACHED] {cached}"
try:
# Smart sampling for large files
df_sample = pd.read_csv(file_path, nrows=max_rows)
total_rows = len(df_sample)
# Check if file is larger than sample
try:
total_full = pd.read_csv(file_path, nrows=1)
full_df = pd.read_csv(file_path)
actual_total = len(full_df)
df = df_sample
except:
actual_total = total_rows
df = df_sample
if analysis_type == "basic":
result = f"CSV Analysis:\nShape: {df.shape}\nColumns: {list(df.columns)}\nSample data:\n{df.head().to_string()}"
elif analysis_type == "stats":
numeric_cols = df.select_dtypes(include=[float, int]).columns
if len(numeric_cols) > 0:
result = f"Statistical Analysis:\nShape: {df.shape}\nNumerical columns: {list(numeric_cols)}\n{df[numeric_cols].describe().to_string()}"
else:
result = f"No numerical columns found. Available columns: {list(df.columns)}"
elif analysis_type == "sample":
result = f"Sample Analysis (first {len(df)} rows of {actual_total} total):\nColumns: {list(df.columns)}\nSample:\n{df.head(10).to_string()}"
else:
result = f"Custom Analysis:\nShape: {df.shape}\nData types:\n{df.dtypes.to_string()}\nMissing values:\n{df.isnull().sum().to_string()}"
if actual_total > max_rows:
result += f"\n\n[NOTE: Analyzed sample of {max_rows} rows from {actual_total} total rows]"
self.cache.set(cache_key, result)
return result
except Exception as e:
return f"CSV analysis error: {str(e)}"
class AdvancedReasoningTool(Tool):
name = "advanced_reasoning"
description = "Advanced reasoning with step-by-step logic and multiple reasoning strategies"
inputs = {
"question": {"type": "string", "description": "Question to reason about"},
"strategy": {"type": "string", "description": "Reasoning strategy (decomposition/analogical/critical)", "default": "decomposition", "nullable": True}
}
output_type = "string"
def __init__(self):
super().__init__()
self.cache = CacheManager()
def forward(self, question: str, strategy: str = "decomposition") -> str:
cache_key = f"reason:{question}:{strategy}"
cached = self.cache.get(cache_key)
if cached:
return f"[CACHED] {cached}"
try:
if strategy == "decomposition":
return self._decomposition_reasoning(question)
elif strategy == "analogical":
return self._analogical_reasoning(question)
elif strategy == "critical":
return self._critical_reasoning(question)
else:
return self._decomposition_reasoning(question)
except Exception as e:
return f"Reasoning error: {str(e)}"
def _decomposition_reasoning(self, question: str) -> str:
# Multi-step reasoning with structured approach
sub_questions = []
# Pattern-based decomposition
patterns = {
'and|also|additionally|furthermore': 'conjunction',
'because|since|due to': 'causal',
'if|then|when': 'conditional',
'what is|who is|where is': 'identification',
'why|how': 'explanatory'
}
detected_patterns = []
for pattern, type_name in patterns.items():
if re.search(pattern, question, re.IGNORECASE):
detected_patterns.append(type_name)
# Split compound questions
if any(word in question.lower() for word in ['and', 'also', 'additionally']):
parts = re.split(r'\band\b|\balso\b|\badditionally\b|\bfurthermore\b', question, flags=re.IGNORECASE)
sub_questions = [p.strip() for p in parts if p.strip()]
# Numbered questions
numbered_pattern = r'(\d+)\.\s*(.+?)(?=\d+\.|$)'
matches = re.findall(numbered_pattern, question)
if matches:
sub_questions = [m[1].strip() for m in matches]
if sub_questions:
result = f"Multi-part reasoning detected ({', '.join(detected_patterns)} patterns):\n"
for i, sub_q in enumerate(sub_questions, 1):
result += f"{i}. Analyze: {sub_q}\n"
else:
result = f"Single reasoning question with {', '.join(detected_patterns)} patterns. "
result += f"Approach: Break down '{question}' into logical components and solve step by step."
return result
def _analogical_reasoning(self, question: str) -> str:
return f"Analogical reasoning for: {question}\nApproach: Find similar patterns, cases, or principles that can be applied."
def _critical_reasoning(self, question: str) -> str:
return f"Critical reasoning for: {question}\nApproach: Examine assumptions, evaluate evidence, consider alternative perspectives."
class FileSystemTool(Tool):
name = "file_system_explorer"
description = "Explore file system efficiently with smart filtering and file operations"
inputs = {
"path": {"type": "string", "description": "Directory path to explore"},
"pattern": {"type": "string", "description": "File pattern to match (e.g., '*.py', '*.txt')", "default": "*", "nullable": True},
"max_depth": {"type": "integer", "description": "Maximum directory depth", "default": 3, "nullable": True}
}
output_type = "string"
def forward(self, path: str, pattern: str = "*", max_depth: int = 3) -> str:
try:
dir_path = Path(path)
if not dir_path.exists():
return f"Path does not exist: {path}"
files = []
try:
for file_path in dir_path.rglob(pattern):
if file_path.is_file():
relative_path = file_path.relative_to(dir_path)
depth = len(relative_path.parts) - 1
if depth <= max_depth:
size = file_path.stat().st_size
files.append(f"{relative_path} ({size} bytes)")
except Exception as e:
return f"Error exploring {path}: {str(e)}"
if files:
return f"File system exploration:\nPath: {path}\nPattern: {pattern}\nMax depth: {max_depth}\nFiles found:\n" + "\n".join(files[:20])
else:
return f"No files found matching pattern '{pattern}' in {path}"
except Exception as e:
return f"File system error: {str(e)}"
class WebAPIConnectorTool(Tool):
name = "web_api_connector"
description = "Connect to various APIs for data retrieval and processing"
inputs = {
"api_type": {"type": "string", "description": "API type (weather, currency, news, stock)"},
"query": {"type": "string", "description": "API query/parameters"},
"format": {"type": "string", "description": "Output format (summary/json)", "default": "summary", "nullable": True}
}
output_type = "string"
def forward(self, api_type: str, query: str, format: str = "summary") -> str:
try:
import requests
import json
# Mock API responses for common services
if api_type.lower() == "weather":
# This would typically use a real weather API
return f"Weather data for {query}: [Mock data] Temperature, humidity, conditions available via weather API"
elif api_type.lower() == "currency":
return f"Currency data for {query}: [Mock data] Exchange rates, trends available via currency API"
elif api_type.lower() == "news":
return f"News data for {query}: [Mock data] Recent headlines, articles available via news API"
elif api_type.lower() == "stock":
return f"Stock data for {query}: [Mock data] Prices, market data available via stock API"
else:
return f"API type '{api_type}' not supported. Available: weather, currency, news, stock"
except Exception as e:
return f"API connection error: {str(e)}"
class WikipediaTool(Tool):
name = "wikipedia_search"
description = "Search Wikipedia for information on any topic"
inputs = {
"query": {"type": "string", "description": "Topic to search on Wikipedia"},
"num_sentences": {"type": "integer", "description": "Number of sentences to return", "default": 3, "nullable": True}
}
output_type = "string"
def __init__(self):
super().__init__()
self.cache = CacheManager()
def forward(self, query: str, num_sentences: int = 3) -> str:
cache_key = f"wiki:{query}:{num_sentences}"
cached = self.cache.get(cache_key)
if cached:
return f"[CACHED] {cached}"
try:
import wikipedia
summary = wikipedia.summary(query, sentences=num_sentences)
result = summary
self.cache.set(cache_key, result)
return result
except wikipedia.exceptions.DisambiguationError as e:
options = e.options[:3]
return f"Multiple options found: {', '.join(options)}. Please be more specific."
except Exception as e:
return f"Wikipedia search error: {str(e)}"
class ImageProcessorTool(Tool):
name = "image_processor"
description = "Process and analyze images with basic operations"
inputs = {
"image_path": {"type": "string", "description": "Path to the image file"},
"operation": {"type": "string", "description": "Operation (analyze/resize/info)", "default": "analyze", "nullable": True}
}
output_type = "string"
def forward(self, image_path: str, operation: str = "analyze") -> str:
try:
from PIL import Image
import os
if not os.path.exists(image_path):
return f"Image file not found: {image_path}"
img = Image.open(image_path)
if operation == "info":
return f"Image info: {img.size} pixels, mode: {img.mode}, format: {img.format}"
elif operation == "resize":
# Just return info about what resize would do
return f"Resize operation for {image_path}: Current size {img.size}. Would resize to specified dimensions."
else:
return f"Image analysis for {image_path}:\nSize: {img.size}\nMode: {img.mode}\nFormat: {img.format}\nBasic image properties extracted."
except Exception as e:
return f"Image processing error: {str(e)}"
# -------------------------
# Intelligent Tool Router
# -------------------------
class IntelligentToolRouter:
def __init__(self, tools: List[Tool]):
self.tools = {tool.name: tool for tool in tools}
self.keywords = {
'math': ['calculate', 'compute', 'solve', 'equation', 'formula', 'sum', 'product', 'area', 'radius', 'sqrt', '+', '-', '*', '/'],
'pdf': ['pdf', 'document', 'read pdf'],
'csv': ['csv', 'excel', 'spreadsheet', 'data analysis'],
'reasoning': ['why', 'explain', 'reason', 'logic', 'conclusion', 'infer', 'deduce', 'think through'],
'files': ['file', 'directory', 'folder', 'explore', 'list'],
'api': ['api', 'weather', 'currency', 'news', 'stock', 'data service'],
'image': ['image', 'picture', 'photo', 'visual', 'picture'],
'wikipedia': ['wikipedia', 'who is', 'what is', 'where is', 'history', 'biography']
}
def get_best_tool(self, query: str) -> Optional[str]:
query_lower = query.lower()
scores = {}
for tool_category, keywords in self.keywords.items():
score = sum(1 for keyword in keywords if keyword in query_lower)
if score > 0:
# Map categories to actual tools
tool_mapping = {
'math': 'smart_calculator',
'pdf': 'progressive_pdf_reader',
'csv': 'efficient_csv_analyzer',
'reasoning': 'advanced_reasoning',
'files': 'file_system_explorer',
'api': 'web_api_connector',
'image': 'image_processor',
'wikipedia': 'wikipedia_search'
}
if tool_category in tool_mapping:
scores[tool_mapping[tool_category]] = score
if scores:
return max(scores, key=scores.get)
return None
def suggest_tools(self, query: str) -> List[str]:
query_lower = query.lower()
suggestions = []
# Check for mathematical expressions
if re.search(r'\d+[\s\+\-\*\/x\^]', query_lower) or any(word in query_lower for word in ['calculate', 'solve']):
suggestions.append('smart_calculator')
# Check for file operations
if any(word in query_lower for word in ['file', 'pdf', 'csv', 'document']):
if 'pdf' in query_lower:
suggestions.append('progressive_pdf_reader')
if 'csv' in query_lower:
suggestions.append('efficient_csv_analyzer')
if 'file' in query_lower:
suggestions.append('file_system_explorer')
# Check for Wikipedia queries
if any(word in query_lower for word in ['who is', 'what is', 'where is', 'wikipedia', 'history', 'biography']):
suggestions.append('wikipedia_search')
return suggestions
# -------------------------
# Optimized Smolagents GAIA Agent
# -------------------------
class OptimizedSmolagentsGAIAgent:
def __init__(self):
# Initialize optimized tools
self.calculator = SmartCalculatorTool()
self.pdf_reader = ProgressivePDFTool()
self.csv_analyzer = EfficientCSVAnalyzerTool()
self.reasoning = AdvancedReasoningTool()
self.file_system = FileSystemTool()
self.web_api = WebAPIConnectorTool()
self.image_processor = ImageProcessorTool()
self.wikipedia = WikipediaTool()
self.web_search = DuckDuckGoSearchTool()
self.visit_webpage = VisitWebpageTool()
self.tools = [
self.calculator,
self.pdf_reader,
self.csv_analyzer,
self.reasoning,
self.file_system,
self.web_api,
self.image_processor,
self.wikipedia,
self.web_search,
self.visit_webpage
]
# Initialize intelligent router
self.tool_router = IntelligentToolRouter(self.tools)
# Initialize Hugging Face model
self.model = self._initialize_model()
# Initialize agent with optimizations
self.agent = CodeAgent(
tools=self.tools,
model=self.model,
max_steps=8, # Reduced from 10 for efficiency
verbosity_level=0 # Reduced verbosity for token efficiency
)
# Initialize cache manager
self.cache = CacheManager()
def _initialize_model(self):
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
print("HF_TOKEN not found. Please set it in environment variables")
return None
try:
from smolagents import InferenceClientModel
model = InferenceClientModel(
model_id="allenai/Olmo-3-32B-Think"
token=hf_token
)
print("Using HuggingFace model")
return model
except Exception as e:
print(f"Error initializing HuggingFace model: {e}")
return None
def classify_question(self, question: str) -> Dict[str, Any]:
"""Enhanced question classification with confidence scores"""
q_lower = question.lower()
# Check for direct tool suggestions
suggested_tools = self.tool_router.suggest_tools(question)
# Mathematical detection
math_score = 0
math_keywords = ['calculate', 'compute', 'solve', 'equation', 'formula', 'sum', 'product', 'area', 'radius', 'sqrt']
for keyword in math_keywords:
if keyword in q_lower:
math_score += 1
# Check for math expressions
if re.search(r'\d+[\s\+\-\*\/x\^]', q_lower):
math_score += 2
# File processing detection
file_score = 0
if 'pdf' in q_lower:
file_score += 2
if 'csv' in q_lower:
file_score += 2
if any(word in q_lower for word in ['file', 'document', 'spreadsheet']):
file_score += 1
# Reasoning detection
reasoning_score = 0
reasoning_keywords = ['why', 'explain', 'reason', 'logic', 'conclusion', 'infer', 'deduce', 'think']
for keyword in reasoning_keywords:
if keyword in q_lower:
reasoning_score += 1
return {
'mathematical': math_score,
'file_processing': file_score,
'reasoning': reasoning_score,
'suggested_tools': suggested_tools,
'primary_type': max(['mathematical', 'file_processing', 'reasoning'],
key=lambda x: locals()[x.split('_')[0]] if x.split('_')[0] in locals() else 0)
}
def process_question(self, question: str) -> str:
"""Enhanced question processing with intelligent routing"""
if not self.model:
return "Error: No language model available. Set HF_TOKEN."
# Check cache first
cache_key = f"question:{question}"
cached_result = self.cache.get(cache_key)
if cached_result:
return f"[CACHED] {cached_result}"
try:
classification = self.classify_question(question)
# Create optimized prompt based on classification
if classification['mathematical'] > 0:
prompt = f"Solve this mathematical problem efficiently: {question}\nUse the smart_calculator tool for precise calculations."
elif classification['file_processing'] > 0:
if 'pdf' in question.lower():
prompt = f"Process this PDF-related question efficiently: {question}\nUse progressive_pdf_reader for optimal results."
elif 'csv' in question.lower():
prompt = f"Process this CSV-related question efficiently: {question}\nUse efficient_csv_analyzer for smart analysis."
else:
prompt = f"Process this file-related question efficiently: {question}"
elif 'wikipedia' in classification.get('suggested_tools', []) or any(word in question.lower() for word in ['who is', 'what is', 'where is', 'wikipedia']):
prompt = f"Search Wikipedia for information: {question}\nUse wikipedia_search for accurate information."
elif classification['reasoning'] > 0:
prompt = f"Provide structured reasoning for this question: {question}\nUse advanced_reasoning for step-by-step analysis."
else:
prompt = f"Find accurate information efficiently for this question: {question}"
# Add tool suggestions to prompt
if classification['suggested_tools']:
prompt += f"\nSuggested tools: {', '.join(classification['suggested_tools'])}"
# Add efficiency note
prompt += "\n[OPTIMIZED: Process efficiently with minimal token usage]"
result = self.agent.run(prompt)
result_str = str(result)
# Format response for HuggingFace compatibility
# Wrap any code blocks in <code> tags as expected by the parsing system
if "```" in result_str or "def " in result_str or "import " in result_str:
# Extract code blocks and wrap them properly
import re
# Find Python code blocks (either fenced with ```python or ``` or plain code)
code_pattern = r'```(?:python)?\n(.*?)\n```|```\n(.*?)\n```|([a-zA-Z_][a-zA-Z0-9_]*\s*=\s*.*|def\s+\w+.*|import\s+\w+.*|from\s+\w+.*import.*)'
matches = re.findall(code_pattern, result_str, re.DOTALL)
if matches:
formatted_result = result_str
for match in matches:
code_content = match[0] if match[0] else match[1] if match[1] else match[2]
if code_content:
# Wrap in <code> tags as expected by HuggingFace parser
wrapped_code = f"<code>{code_content.strip()}</code>"
formatted_result = formatted_result.replace(code_content.strip(), wrapped_code)
result_str = formatted_result
else:
# If no code blocks found but response contains code-like patterns, wrap entire response
if any(keyword in result_str.lower() for keyword in ['def ', 'import ', 'return ', 'print(', '# ']):
result_str = f"<code>{result_str}</code>"
# Cache the result
self.cache.set(cache_key, result_str)
return result_str
except Exception as e:
return f"Agent processing error: {str(e)}"
def get_tool_recommendations(self, question: str) -> List[str]:
"""Get tool recommendations for a given question"""
return self.tool_router.suggest_tools(question)
def clear_cache(self):
"""Clear the agent cache"""
import shutil
cache_dir = Path(".agent_cache")
if cache_dir.exists():
shutil.rmtree(cache_dir)
print("Cache cleared successfully")
# -------------------------
# Test the optimized agent
# -------------------------
if __name__ == "__main__":
agent = OptimizedSmolagentsGAIAgent()
test_questions = [
"What is the capital of France?",
"Calculate 15 + 27 * 3",
"Who wrote Romeo and Juliet?",
"What is the square root of 144?",
"Explain why the sky is blue",
"Analyze the sales data in sales.csv",
"Read the first 3 pages of document.pdf",
"Explore the current directory for Python files"
]
print("=== OPTIMIZED SMOLAGENTS AGENT TEST ===\n")
for i, question in enumerate(test_questions, 1):
print(f"Test {i}: {question}")
recommendations = agent.get_tool_recommendations(question)
if recommendations:
print(f"Tool recommendations: {recommendations}")
answer = agent.process_question(question)
print(f"Answer: {answer[:200]}...")
print("-" * 50)
print("\nCache management:")
print(f"Cache directory: {Path('.agent_cache')}")
print("Use agent.clear_cache() to clear cache if needed")