Spaces:
Running
Running
| # ============================================================ | |
| # app/routes/search.py - AIDA Reliable Hybrid Search | |
| # ============================================================ | |
| """ | |
| BULLETPROOF SEARCH: | |
| 1. User types in ANY language with ANY typos | |
| 2. AI normalizes to English | |
| 3. MongoDB tries first (fast, strict) | |
| 4. Qdrant fallback if no results (semantic, fuzzy) | |
| 5. Response in user's language | |
| Goal: If property exists, user WILL find it. | |
| """ | |
| import logging | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, List | |
| from app.database import get_db | |
| from app.models.listing import Listing | |
| from app.guards.jwt_guard import get_current_user | |
| # Import LLM | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from app.config import settings | |
| from app.ai.agent.validators import JSONValidator | |
| # Import Qdrant hybrid search | |
| from app.ai.services.search_service import search_listings_hybrid | |
| router = APIRouter(tags=["AIDA Search"]) | |
| logger = logging.getLogger(__name__) | |
| # LLM | |
| llm = ChatOpenAI( | |
| api_key=settings.DEEPSEEK_API_KEY, | |
| base_url=settings.DEEPSEEK_BASE_URL, | |
| model="deepseek-chat", | |
| temperature=0.2, | |
| ) | |
| # ============================================================ | |
| # SCHEMAS | |
| # ============================================================ | |
| class SearchRequestDto(BaseModel): | |
| query: str = Field(..., min_length=1) | |
| limit: Optional[int] = Field(default=10, ge=1, le=50) | |
| class SearchResponseDto(BaseModel): | |
| success: bool | |
| message: str | |
| data: List[dict] | |
| total: int | |
| search_params: Optional[dict] = None | |
| # ============================================================ | |
| # EXTRACTION PROMPT | |
| # ============================================================ | |
| EXTRACTION_PROMPT = """You are a multilingual search assistant. Extract search criteria and output in ENGLISH. | |
| User's query: "{query}" | |
| TASK: | |
| 1. Understand the query in ANY language | |
| 2. FIX ALL TYPOS (especially city names) | |
| 3. Translate values to ENGLISH | |
| 4. Detect user's language | |
| LOCATION TYPO FIXES (IMPORTANT): | |
| - "clalavi"/"callavi"/"clavai" → "Calavi" | |
| - "cotonoo"/"cotonu"/"kotonoo" → "Cotonou" | |
| - "lagoss"/"laogs"/"lagos" → "Lagos" | |
| - "porto novo"/"portonovo" → "Porto-Novo" | |
| - "abujaa"/"abja" → "Abuja" | |
| LISTING TYPE (translate to English): | |
| - "en vente"/"à vendre"/"for sale" → "sale" | |
| - "à louer"/"for rent"/"location" → "rent" | |
| - "courte durée"/"short stay" → "short-stay" | |
| - "colocataire"/"roommate" → "roommate" | |
| PRICE PARSING: | |
| - "20k"/"20000" → 20000 | |
| - "house of 20k" → min_price: 18000, max_price: 22000 | |
| - "under 50k" → max_price: 50000 | |
| - "above 100k" → min_price: 100000 | |
| Return ONLY valid JSON: | |
| {{ | |
| "location": string or null, | |
| "min_price": number or null, | |
| "max_price": number or null, | |
| "bedrooms": number or null, | |
| "bathrooms": number or null, | |
| "listing_type": "rent" | "sale" | "short-stay" | "roommate" | null, | |
| "amenities": [], | |
| "user_language": "en" | "fr" | "es" | "pt" | etc. | |
| }}""" | |
| async def extract_search_params(query: str) -> dict: | |
| """Extract and normalize search parameters.""" | |
| try: | |
| prompt = EXTRACTION_PROMPT.format(query=query) | |
| response = await llm.ainvoke([ | |
| SystemMessage(content="Extract search params. Fix typos. Translate to English. JSON only."), | |
| HumanMessage(content=prompt) | |
| ]) | |
| response_text = response.content if hasattr(response, 'content') else str(response) | |
| validation = JSONValidator.extract_and_validate(response_text) | |
| if not validation.is_valid: | |
| return {"user_language": "en"} | |
| logger.info(f"Extracted: {validation.data}") | |
| return validation.data | |
| except Exception as e: | |
| logger.error(f"Extraction error: {e}") | |
| return {"user_language": "en"} | |
| # ============================================================ | |
| # MONGODB SEARCH (Fast, Strict) | |
| # ============================================================ | |
| async def search_mongodb(params: dict, limit: int = 10) -> list: | |
| """Fast MongoDB search with exact filters.""" | |
| db = await get_db() | |
| query = {"status": "active"} | |
| if params.get("location"): | |
| query["location"] = {"$regex": params["location"], "$options": "i"} | |
| min_price = params.get("min_price") | |
| max_price = params.get("max_price") | |
| if min_price and max_price: | |
| query["price"] = {"$gte": min_price, "$lte": max_price} | |
| elif min_price: | |
| query["price"] = {"$gte": min_price} | |
| elif max_price: | |
| query["price"] = {"$lte": max_price} | |
| if params.get("bedrooms"): | |
| query["bedrooms"] = {"$gte": params["bedrooms"]} | |
| if params.get("listing_type"): | |
| query["listing_type"] = {"$regex": params["listing_type"], "$options": "i"} | |
| if params.get("amenities") and len(params["amenities"]) > 0: | |
| amenity_regex = [{"amenities": {"$regex": a, "$options": "i"}} for a in params["amenities"]] | |
| query["$and"] = amenity_regex | |
| logger.info(f"MongoDB query: {query}") | |
| try: | |
| cursor = db.listings.find(query).sort("created_at", -1).limit(limit) | |
| results = [] | |
| async for doc in cursor: | |
| if "_id" in doc: | |
| doc["_id"] = str(doc["_id"]) | |
| results.append(doc) | |
| return results | |
| except Exception as e: | |
| logger.error(f"MongoDB error: {e}") | |
| return [] | |
| # ============================================================ | |
| # QDRANT FALLBACK (Semantic, but still respect location) | |
| # ============================================================ | |
| async def search_qdrant_fallback(query: str, params: dict, limit: int = 10) -> list: | |
| """Semantic search fallback - but STILL filter by location if specified.""" | |
| try: | |
| logger.info("Trying Qdrant semantic fallback...") | |
| results, _ = await search_listings_hybrid( | |
| user_query=query, | |
| search_params=params, | |
| limit=limit * 3, # Get more results to filter | |
| mode="relaxed" | |
| ) | |
| # IMPORTANT: Filter by location if user specified one | |
| # This prevents returning random listings from other cities | |
| location_filter = params.get("location") | |
| if location_filter and results: | |
| filtered = [] | |
| for doc in results: | |
| doc_location = doc.get("location", "") | |
| # Case-insensitive partial match | |
| if location_filter.lower() in doc_location.lower(): | |
| filtered.append(doc) | |
| logger.info(f"Qdrant: {len(results)} raw → {len(filtered)} after location filter") | |
| return filtered[:limit] | |
| return results[:limit] | |
| except Exception as e: | |
| logger.error(f"Qdrant fallback error: {e}") | |
| return [] | |
| # ============================================================ | |
| # RESPONSE GENERATOR | |
| # ============================================================ | |
| async def generate_message(query: str, count: int, params: dict, used_fallback: bool) -> str: | |
| """Generate response in user's language.""" | |
| try: | |
| user_lang = params.get("user_language", "en") | |
| location = params.get("location", "") | |
| # Quick templates for common cases (no LLM call needed) | |
| if count == 0: | |
| templates = { | |
| "en": "No properties found. Try adjusting your search.", | |
| "fr": "Aucune propriété trouvée. Essayez d'autres critères.", | |
| "es": "No se encontraron propiedades. Intente otra búsqueda.", | |
| "pt": "Nenhuma propriedade encontrada. Tente outros critérios." | |
| } | |
| return templates.get(user_lang, templates["en"]) | |
| # Build response with LLM for natural phrasing | |
| prompt = f"""Generate a 1-sentence search result message. | |
| Count: {count} properties found | |
| Location: {location or 'search area'} | |
| Language: {user_lang} | |
| Examples: | |
| - en: "Found {count} properties in {location}!" | |
| - fr: "Voici {count} propriétés à {location} !" | |
| Write ONLY the message.""" | |
| response = await llm.ainvoke([ | |
| SystemMessage(content=f"Respond in {user_lang}. One sentence only."), | |
| HumanMessage(content=prompt) | |
| ]) | |
| return response.content.strip() | |
| except Exception as e: | |
| logger.error(f"Message error: {e}") | |
| return f"Found {count} properties!" if count > 0 else "No properties found." | |
| # ============================================================ | |
| # MAIN SEARCH ENDPOINT | |
| # ============================================================ | |
| async def aida_search( | |
| dto: SearchRequestDto, | |
| current_user: dict = Depends(get_current_user), | |
| ): | |
| """ | |
| RELIABLE HYBRID SEARCH | |
| 1. Extract & normalize query (any language → English) | |
| 2. Search MongoDB first (fast, strict) | |
| 3. If no results → Qdrant fallback (semantic, fuzzy) | |
| 4. Respond in user's language | |
| If a property exists, the user WILL find it. | |
| """ | |
| logger.info(f"AIDA Search: {dto.query}") | |
| try: | |
| # Step 1: Extract parameters | |
| search_params = await extract_search_params(dto.query) | |
| # Step 2: Try MongoDB first (fast) | |
| results = await search_mongodb(search_params, dto.limit) | |
| used_fallback = False | |
| # Step 3: If no results, try Qdrant (semantic fallback) | |
| if not results: | |
| logger.info("MongoDB found nothing, trying Qdrant fallback...") | |
| results = await search_qdrant_fallback(dto.query, search_params, dto.limit) | |
| used_fallback = True | |
| # Step 4: Format listings | |
| formatted_listings = [] | |
| for doc in results: | |
| if "_id" in doc and not isinstance(doc["_id"], str): | |
| doc["_id"] = str(doc["_id"]) | |
| # Clean up internal fields | |
| doc.pop("_relevance_score", None) | |
| doc.pop("_is_suggestion", None) | |
| doc.pop("location_lower", None) | |
| doc.pop("listing_type_lower", None) | |
| try: | |
| listing = Listing(**doc) | |
| formatted_listings.append(listing.model_dump(by_alias=True)) | |
| except Exception as e: | |
| logger.warning(f"Format warning: {e}") | |
| formatted_listings.append(doc) | |
| # Step 5: Generate response in user's language | |
| message = await generate_message(dto.query, len(formatted_listings), search_params, used_fallback) | |
| logger.info(f"Search complete: {len(formatted_listings)} results (fallback: {used_fallback})") | |
| return SearchResponseDto( | |
| success=True, | |
| message=message, | |
| data=formatted_listings, | |
| total=len(formatted_listings), | |
| search_params=search_params | |
| ) | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Search failed: {str(e)}" | |
| ) | |