AIDA / sync_all_listings_to_qdrant.py
destinyebuka's picture
fyp
5a6c225
"""
Full Sync Script: MongoDB β†’ Qdrant
===================================
Syncs ALL active listings from MongoDB to Qdrant with complete data including images.
This creates new points in Qdrant for listings that don't exist yet.
Usage:
python sync_all_listings_to_qdrant.py
"""
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from structlog import get_logger
import os
from dotenv import load_dotenv
from uuid import UUID
import httpx
# Load environment variables
load_dotenv()
logger = get_logger(__name__)
# Configuration
MONGO_URI = os.getenv("MONGODB_URL")
if not MONGO_URI:
raise ValueError("MONGODB_URL environment variable not set in .env file")
MONGO_DB = os.getenv("MONGODB_DATABASE", "lojiz")
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
if not QDRANT_URL or not QDRANT_API_KEY:
raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in .env")
if not OPENROUTER_API_KEY:
raise ValueError("OPENROUTER_API_KEY must be set in .env for embeddings")
COLLECTION_NAME = "listings"
EMBED_MODEL = "qwen/qwen3-embedding-8b"
def get_deterministic_uuid(mongo_id: str) -> str:
"""Convert MongoDB ObjectId to deterministic UUID for Qdrant."""
import hashlib
hash_bytes = hashlib.md5(mongo_id.encode()).digest()
return str(UUID(bytes=hash_bytes))
async def embed_query(text: str) -> list:
"""Create embedding using OpenRouter."""
async with httpx.AsyncClient(timeout=30) as client:
payload = {
"model": EMBED_MODEL,
"input": text,
"encoding_format": "float"
}
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
response = await client.post(
"https://openrouter.ai/api/v1/embeddings",
json=payload,
headers=headers
)
response.raise_for_status()
data = response.json()
return data["data"][0]["embedding"]
async def sync_all_listings():
"""Sync all active listings from MongoDB to Qdrant."""
# Connect to MongoDB
mongo_client = AsyncIOMotorClient(MONGO_URI)
db = mongo_client[MONGO_DB]
# Connect to Qdrant
qdrant_client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
timeout=60
)
try:
# Fetch all active listings from MongoDB
cursor = db.listings.find({"status": "active"})
listings = await cursor.to_list(length=None)
total = len(listings)
logger.info(f"Found {total} active listings to sync")
synced = 0
skipped = 0
errors = 0
for idx, listing in enumerate(listings, 1):
mongo_id = str(listing["_id"])
point_id = get_deterministic_uuid(mongo_id)
try:
# Build text for embedding
bedrooms = listing.get("bedrooms") or 0
location = listing.get("location") or ""
address = listing.get("address") or ""
title = listing.get("title") or ""
description = listing.get("description") or ""
text = f"{title}. {bedrooms}-bed in {location} ({address}). {description}".strip()
# Generate embedding
print(f"[{idx}/{total}] Generating embedding for: {title[:50]}...")
vector = await embed_query(text)
# Prepare payload
price_type = listing.get("price_type") or ""
listing_type = listing.get("listing_type") or listing.get("type") or ""
payload = {
"mongo_id": mongo_id,
"title": title,
"description": description,
"location": location,
"location_lower": location.lower() if location else "",
"address": address,
"price": float(listing.get("price") or 0),
"price_type": price_type,
"price_type_lower": price_type.lower() if price_type else "",
"listing_type": listing_type,
"listing_type_lower": listing_type.lower() if listing_type else "",
"bedrooms": int(bedrooms),
"bathrooms": int(listing.get("bathrooms") or 0),
"amenities": [a.lower() for a in (listing.get("amenities") or [])],
"currency": listing.get("currency", "XOF"),
"status": listing.get("status", "active"),
"latitude": listing.get("latitude"),
"longitude": listing.get("longitude"),
"images": listing.get("images", []), # βœ… Include images
}
# Upsert point to Qdrant
qdrant_client.upsert(
collection_name=COLLECTION_NAME,
points=[
PointStruct(
id=point_id,
vector=vector,
payload=payload
)
]
)
synced += 1
logger.info(
f"[{idx}/{total}] βœ… Synced",
mongo_id=mongo_id,
title=title[:30],
images_count=len(listing.get("images", []))
)
except Exception as e:
errors += 1
logger.error(
f"[{idx}/{total}] ❌ Failed",
mongo_id=mongo_id,
error=str(e)
)
# Summary
print("\n" + "="*60)
print("SYNC COMPLETE")
print("="*60)
print(f"Total listings: {total}")
print(f"βœ… Synced: {synced}")
print(f"⚠️ Skipped: {skipped}")
print(f"❌ Errors: {errors}")
print("="*60)
finally:
mongo_client.close()
qdrant_client.close()
if __name__ == "__main__":
print("Starting full sync from MongoDB to Qdrant...")
print("This will create embeddings for all listings (may take a few minutes)...\n")
asyncio.run(sync_all_listings())