Spaces:
Running
Running
| """ | |
| Full Sync Script: MongoDB β Qdrant | |
| =================================== | |
| Syncs ALL active listings from MongoDB to Qdrant with complete data including images. | |
| This creates new points in Qdrant for listings that don't exist yet. | |
| Usage: | |
| python sync_all_listings_to_qdrant.py | |
| """ | |
| import asyncio | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import PointStruct | |
| from structlog import get_logger | |
| import os | |
| from dotenv import load_dotenv | |
| from uuid import UUID | |
| import httpx | |
| # Load environment variables | |
| load_dotenv() | |
| logger = get_logger(__name__) | |
| # Configuration | |
| MONGO_URI = os.getenv("MONGODB_URL") | |
| if not MONGO_URI: | |
| raise ValueError("MONGODB_URL environment variable not set in .env file") | |
| MONGO_DB = os.getenv("MONGODB_DATABASE", "lojiz") | |
| QDRANT_URL = os.getenv("QDRANT_URL") | |
| QDRANT_API_KEY = os.getenv("QDRANT_API_KEY") | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| if not QDRANT_URL or not QDRANT_API_KEY: | |
| raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in .env") | |
| if not OPENROUTER_API_KEY: | |
| raise ValueError("OPENROUTER_API_KEY must be set in .env for embeddings") | |
| COLLECTION_NAME = "listings" | |
| EMBED_MODEL = "qwen/qwen3-embedding-8b" | |
| def get_deterministic_uuid(mongo_id: str) -> str: | |
| """Convert MongoDB ObjectId to deterministic UUID for Qdrant.""" | |
| import hashlib | |
| hash_bytes = hashlib.md5(mongo_id.encode()).digest() | |
| return str(UUID(bytes=hash_bytes)) | |
| async def embed_query(text: str) -> list: | |
| """Create embedding using OpenRouter.""" | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| payload = { | |
| "model": EMBED_MODEL, | |
| "input": text, | |
| "encoding_format": "float" | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| response = await client.post( | |
| "https://openrouter.ai/api/v1/embeddings", | |
| json=payload, | |
| headers=headers | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data["data"][0]["embedding"] | |
| async def sync_all_listings(): | |
| """Sync all active listings from MongoDB to Qdrant.""" | |
| # Connect to MongoDB | |
| mongo_client = AsyncIOMotorClient(MONGO_URI) | |
| db = mongo_client[MONGO_DB] | |
| # Connect to Qdrant | |
| qdrant_client = QdrantClient( | |
| url=QDRANT_URL, | |
| api_key=QDRANT_API_KEY, | |
| timeout=60 | |
| ) | |
| try: | |
| # Fetch all active listings from MongoDB | |
| cursor = db.listings.find({"status": "active"}) | |
| listings = await cursor.to_list(length=None) | |
| total = len(listings) | |
| logger.info(f"Found {total} active listings to sync") | |
| synced = 0 | |
| skipped = 0 | |
| errors = 0 | |
| for idx, listing in enumerate(listings, 1): | |
| mongo_id = str(listing["_id"]) | |
| point_id = get_deterministic_uuid(mongo_id) | |
| try: | |
| # Build text for embedding | |
| bedrooms = listing.get("bedrooms") or 0 | |
| location = listing.get("location") or "" | |
| address = listing.get("address") or "" | |
| title = listing.get("title") or "" | |
| description = listing.get("description") or "" | |
| text = f"{title}. {bedrooms}-bed in {location} ({address}). {description}".strip() | |
| # Generate embedding | |
| print(f"[{idx}/{total}] Generating embedding for: {title[:50]}...") | |
| vector = await embed_query(text) | |
| # Prepare payload | |
| price_type = listing.get("price_type") or "" | |
| listing_type = listing.get("listing_type") or listing.get("type") or "" | |
| payload = { | |
| "mongo_id": mongo_id, | |
| "title": title, | |
| "description": description, | |
| "location": location, | |
| "location_lower": location.lower() if location else "", | |
| "address": address, | |
| "price": float(listing.get("price") or 0), | |
| "price_type": price_type, | |
| "price_type_lower": price_type.lower() if price_type else "", | |
| "listing_type": listing_type, | |
| "listing_type_lower": listing_type.lower() if listing_type else "", | |
| "bedrooms": int(bedrooms), | |
| "bathrooms": int(listing.get("bathrooms") or 0), | |
| "amenities": [a.lower() for a in (listing.get("amenities") or [])], | |
| "currency": listing.get("currency", "XOF"), | |
| "status": listing.get("status", "active"), | |
| "latitude": listing.get("latitude"), | |
| "longitude": listing.get("longitude"), | |
| "images": listing.get("images", []), # β Include images | |
| } | |
| # Upsert point to Qdrant | |
| qdrant_client.upsert( | |
| collection_name=COLLECTION_NAME, | |
| points=[ | |
| PointStruct( | |
| id=point_id, | |
| vector=vector, | |
| payload=payload | |
| ) | |
| ] | |
| ) | |
| synced += 1 | |
| logger.info( | |
| f"[{idx}/{total}] β Synced", | |
| mongo_id=mongo_id, | |
| title=title[:30], | |
| images_count=len(listing.get("images", [])) | |
| ) | |
| except Exception as e: | |
| errors += 1 | |
| logger.error( | |
| f"[{idx}/{total}] β Failed", | |
| mongo_id=mongo_id, | |
| error=str(e) | |
| ) | |
| # Summary | |
| print("\n" + "="*60) | |
| print("SYNC COMPLETE") | |
| print("="*60) | |
| print(f"Total listings: {total}") | |
| print(f"β Synced: {synced}") | |
| print(f"β οΈ Skipped: {skipped}") | |
| print(f"β Errors: {errors}") | |
| print("="*60) | |
| finally: | |
| mongo_client.close() | |
| qdrant_client.close() | |
| if __name__ == "__main__": | |
| print("Starting full sync from MongoDB to Qdrant...") | |
| print("This will create embeddings for all listings (may take a few minutes)...\n") | |
| asyncio.run(sync_all_listings()) | |