""" Full Sync Script: MongoDB → Qdrant =================================== Syncs ALL active listings from MongoDB to Qdrant with complete data including images. This creates new points in Qdrant for listings that don't exist yet. Usage: python sync_all_listings_to_qdrant.py """ import asyncio from motor.motor_asyncio import AsyncIOMotorClient from qdrant_client import QdrantClient from qdrant_client.models import PointStruct from structlog import get_logger import os from dotenv import load_dotenv from uuid import UUID import httpx # Load environment variables load_dotenv() logger = get_logger(__name__) # Configuration MONGO_URI = os.getenv("MONGODB_URL") if not MONGO_URI: raise ValueError("MONGODB_URL environment variable not set in .env file") MONGO_DB = os.getenv("MONGODB_DATABASE", "lojiz") QDRANT_URL = os.getenv("QDRANT_URL") QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") if not QDRANT_URL or not QDRANT_API_KEY: raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in .env") if not OPENROUTER_API_KEY: raise ValueError("OPENROUTER_API_KEY must be set in .env for embeddings") COLLECTION_NAME = "listings" EMBED_MODEL = "qwen/qwen3-embedding-8b" def get_deterministic_uuid(mongo_id: str) -> str: """Convert MongoDB ObjectId to deterministic UUID for Qdrant.""" import hashlib hash_bytes = hashlib.md5(mongo_id.encode()).digest() return str(UUID(bytes=hash_bytes)) async def embed_query(text: str) -> list: """Create embedding using OpenRouter.""" async with httpx.AsyncClient(timeout=30) as client: payload = { "model": EMBED_MODEL, "input": text, "encoding_format": "float" } headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } response = await client.post( "https://openrouter.ai/api/v1/embeddings", json=payload, headers=headers ) response.raise_for_status() data = response.json() return data["data"][0]["embedding"] async def sync_all_listings(): """Sync all active listings from MongoDB to Qdrant.""" # Connect to MongoDB mongo_client = AsyncIOMotorClient(MONGO_URI) db = mongo_client[MONGO_DB] # Connect to Qdrant qdrant_client = QdrantClient( url=QDRANT_URL, api_key=QDRANT_API_KEY, timeout=60 ) try: # Fetch all active listings from MongoDB cursor = db.listings.find({"status": "active"}) listings = await cursor.to_list(length=None) total = len(listings) logger.info(f"Found {total} active listings to sync") synced = 0 skipped = 0 errors = 0 for idx, listing in enumerate(listings, 1): mongo_id = str(listing["_id"]) point_id = get_deterministic_uuid(mongo_id) try: # Build text for embedding bedrooms = listing.get("bedrooms") or 0 location = listing.get("location") or "" address = listing.get("address") or "" title = listing.get("title") or "" description = listing.get("description") or "" text = f"{title}. {bedrooms}-bed in {location} ({address}). {description}".strip() # Generate embedding print(f"[{idx}/{total}] Generating embedding for: {title[:50]}...") vector = await embed_query(text) # Prepare payload price_type = listing.get("price_type") or "" listing_type = listing.get("listing_type") or listing.get("type") or "" payload = { "mongo_id": mongo_id, "title": title, "description": description, "location": location, "location_lower": location.lower() if location else "", "address": address, "price": float(listing.get("price") or 0), "price_type": price_type, "price_type_lower": price_type.lower() if price_type else "", "listing_type": listing_type, "listing_type_lower": listing_type.lower() if listing_type else "", "bedrooms": int(bedrooms), "bathrooms": int(listing.get("bathrooms") or 0), "amenities": [a.lower() for a in (listing.get("amenities") or [])], "currency": listing.get("currency", "XOF"), "status": listing.get("status", "active"), "latitude": listing.get("latitude"), "longitude": listing.get("longitude"), "images": listing.get("images", []), # ✅ Include images } # Upsert point to Qdrant qdrant_client.upsert( collection_name=COLLECTION_NAME, points=[ PointStruct( id=point_id, vector=vector, payload=payload ) ] ) synced += 1 logger.info( f"[{idx}/{total}] ✅ Synced", mongo_id=mongo_id, title=title[:30], images_count=len(listing.get("images", [])) ) except Exception as e: errors += 1 logger.error( f"[{idx}/{total}] ❌ Failed", mongo_id=mongo_id, error=str(e) ) # Summary print("\n" + "="*60) print("SYNC COMPLETE") print("="*60) print(f"Total listings: {total}") print(f"✅ Synced: {synced}") print(f"⚠️ Skipped: {skipped}") print(f"❌ Errors: {errors}") print("="*60) finally: mongo_client.close() qdrant_client.close() if __name__ == "__main__": print("Starting full sync from MongoDB to Qdrant...") print("This will create embeddings for all listings (may take a few minutes)...\n") asyncio.run(sync_all_listings())