AIDA / app /routes /search.py
destinyebuka's picture
fyp
d477942
# ============================================================
# app/routes/search.py - AIDA Reliable Hybrid Search
# ============================================================
"""
BULLETPROOF SEARCH:
1. User types in ANY language with ANY typos
2. AI normalizes to English
3. MongoDB tries first (fast, strict)
4. Qdrant fallback if no results (semantic, fuzzy)
5. Response in user's language
Goal: If property exists, user WILL find it.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from typing import Optional, List
from app.database import get_db
from app.models.listing import Listing
from app.guards.jwt_guard import get_current_user
# Import LLM
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from app.config import settings
from app.ai.agent.validators import JSONValidator
# Import Qdrant hybrid search
from app.ai.services.search_service import search_listings_hybrid
router = APIRouter(tags=["AIDA Search"])
logger = logging.getLogger(__name__)
# LLM
llm = ChatOpenAI(
api_key=settings.DEEPSEEK_API_KEY,
base_url=settings.DEEPSEEK_BASE_URL,
model="deepseek-chat",
temperature=0.2,
)
# ============================================================
# SCHEMAS
# ============================================================
class SearchRequestDto(BaseModel):
query: str = Field(..., min_length=1)
limit: Optional[int] = Field(default=10, ge=1, le=50)
class SearchResponseDto(BaseModel):
success: bool
message: str
data: List[dict]
total: int
search_params: Optional[dict] = None
# ============================================================
# EXTRACTION PROMPT
# ============================================================
EXTRACTION_PROMPT = """You are a multilingual search assistant. Extract search criteria and output in ENGLISH.
User's query: "{query}"
TASK:
1. Understand the query in ANY language
2. FIX ALL TYPOS (especially city names)
3. Translate values to ENGLISH
4. Detect user's language
LOCATION TYPO FIXES (IMPORTANT):
- "clalavi"/"callavi"/"clavai" → "Calavi"
- "cotonoo"/"cotonu"/"kotonoo" → "Cotonou"
- "lagoss"/"laogs"/"lagos" → "Lagos"
- "porto novo"/"portonovo" → "Porto-Novo"
- "abujaa"/"abja" → "Abuja"
LISTING TYPE (translate to English):
- "en vente"/"à vendre"/"for sale" → "sale"
- "à louer"/"for rent"/"location" → "rent"
- "courte durée"/"short stay" → "short-stay"
- "colocataire"/"roommate" → "roommate"
PRICE PARSING:
- "20k"/"20000" → 20000
- "house of 20k" → min_price: 18000, max_price: 22000
- "under 50k" → max_price: 50000
- "above 100k" → min_price: 100000
Return ONLY valid JSON:
{{
"location": string or null,
"min_price": number or null,
"max_price": number or null,
"bedrooms": number or null,
"bathrooms": number or null,
"listing_type": "rent" | "sale" | "short-stay" | "roommate" | null,
"amenities": [],
"user_language": "en" | "fr" | "es" | "pt" | etc.
}}"""
async def extract_search_params(query: str) -> dict:
"""Extract and normalize search parameters."""
try:
prompt = EXTRACTION_PROMPT.format(query=query)
response = await llm.ainvoke([
SystemMessage(content="Extract search params. Fix typos. Translate to English. JSON only."),
HumanMessage(content=prompt)
])
response_text = response.content if hasattr(response, 'content') else str(response)
validation = JSONValidator.extract_and_validate(response_text)
if not validation.is_valid:
return {"user_language": "en"}
logger.info(f"Extracted: {validation.data}")
return validation.data
except Exception as e:
logger.error(f"Extraction error: {e}")
return {"user_language": "en"}
# ============================================================
# MONGODB SEARCH (Fast, Strict)
# ============================================================
async def search_mongodb(params: dict, limit: int = 10) -> list:
"""Fast MongoDB search with exact filters."""
db = await get_db()
query = {"status": "active"}
if params.get("location"):
query["location"] = {"$regex": params["location"], "$options": "i"}
min_price = params.get("min_price")
max_price = params.get("max_price")
if min_price and max_price:
query["price"] = {"$gte": min_price, "$lte": max_price}
elif min_price:
query["price"] = {"$gte": min_price}
elif max_price:
query["price"] = {"$lte": max_price}
if params.get("bedrooms"):
query["bedrooms"] = {"$gte": params["bedrooms"]}
if params.get("listing_type"):
query["listing_type"] = {"$regex": params["listing_type"], "$options": "i"}
if params.get("amenities") and len(params["amenities"]) > 0:
amenity_regex = [{"amenities": {"$regex": a, "$options": "i"}} for a in params["amenities"]]
query["$and"] = amenity_regex
logger.info(f"MongoDB query: {query}")
try:
cursor = db.listings.find(query).sort("created_at", -1).limit(limit)
results = []
async for doc in cursor:
if "_id" in doc:
doc["_id"] = str(doc["_id"])
results.append(doc)
return results
except Exception as e:
logger.error(f"MongoDB error: {e}")
return []
# ============================================================
# QDRANT FALLBACK (Semantic, but still respect location)
# ============================================================
async def search_qdrant_fallback(query: str, params: dict, limit: int = 10) -> list:
"""Semantic search fallback - but STILL filter by location if specified."""
try:
logger.info("Trying Qdrant semantic fallback...")
results, _ = await search_listings_hybrid(
user_query=query,
search_params=params,
limit=limit * 3, # Get more results to filter
mode="relaxed"
)
# IMPORTANT: Filter by location if user specified one
# This prevents returning random listings from other cities
location_filter = params.get("location")
if location_filter and results:
filtered = []
for doc in results:
doc_location = doc.get("location", "")
# Case-insensitive partial match
if location_filter.lower() in doc_location.lower():
filtered.append(doc)
logger.info(f"Qdrant: {len(results)} raw → {len(filtered)} after location filter")
return filtered[:limit]
return results[:limit]
except Exception as e:
logger.error(f"Qdrant fallback error: {e}")
return []
# ============================================================
# RESPONSE GENERATOR
# ============================================================
async def generate_message(query: str, count: int, params: dict, used_fallback: bool) -> str:
"""Generate response in user's language."""
try:
user_lang = params.get("user_language", "en")
location = params.get("location", "")
# Quick templates for common cases (no LLM call needed)
if count == 0:
templates = {
"en": "No properties found. Try adjusting your search.",
"fr": "Aucune propriété trouvée. Essayez d'autres critères.",
"es": "No se encontraron propiedades. Intente otra búsqueda.",
"pt": "Nenhuma propriedade encontrada. Tente outros critérios."
}
return templates.get(user_lang, templates["en"])
# Build response with LLM for natural phrasing
prompt = f"""Generate a 1-sentence search result message.
Count: {count} properties found
Location: {location or 'search area'}
Language: {user_lang}
Examples:
- en: "Found {count} properties in {location}!"
- fr: "Voici {count} propriétés à {location} !"
Write ONLY the message."""
response = await llm.ainvoke([
SystemMessage(content=f"Respond in {user_lang}. One sentence only."),
HumanMessage(content=prompt)
])
return response.content.strip()
except Exception as e:
logger.error(f"Message error: {e}")
return f"Found {count} properties!" if count > 0 else "No properties found."
# ============================================================
# MAIN SEARCH ENDPOINT
# ============================================================
@router.post("/", response_model=SearchResponseDto)
async def aida_search(
dto: SearchRequestDto,
current_user: dict = Depends(get_current_user),
):
"""
RELIABLE HYBRID SEARCH
1. Extract & normalize query (any language → English)
2. Search MongoDB first (fast, strict)
3. If no results → Qdrant fallback (semantic, fuzzy)
4. Respond in user's language
If a property exists, the user WILL find it.
"""
logger.info(f"AIDA Search: {dto.query}")
try:
# Step 1: Extract parameters
search_params = await extract_search_params(dto.query)
# Step 2: Try MongoDB first (fast)
results = await search_mongodb(search_params, dto.limit)
used_fallback = False
# Step 3: If no results, try Qdrant (semantic fallback)
if not results:
logger.info("MongoDB found nothing, trying Qdrant fallback...")
results = await search_qdrant_fallback(dto.query, search_params, dto.limit)
used_fallback = True
# Step 4: Format listings
formatted_listings = []
for doc in results:
if "_id" in doc and not isinstance(doc["_id"], str):
doc["_id"] = str(doc["_id"])
# Clean up internal fields
doc.pop("_relevance_score", None)
doc.pop("_is_suggestion", None)
doc.pop("location_lower", None)
doc.pop("listing_type_lower", None)
try:
listing = Listing(**doc)
formatted_listings.append(listing.model_dump(by_alias=True))
except Exception as e:
logger.warning(f"Format warning: {e}")
formatted_listings.append(doc)
# Step 5: Generate response in user's language
message = await generate_message(dto.query, len(formatted_listings), search_params, used_fallback)
logger.info(f"Search complete: {len(formatted_listings)} results (fallback: {used_fallback})")
return SearchResponseDto(
success=True,
message=message,
data=formatted_listings,
total=len(formatted_listings),
search_params=search_params
)
except Exception as e:
logger.error(f"Search error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Search failed: {str(e)}"
)