File size: 6,643 Bytes
5a6c225
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
Full Sync Script: MongoDB β†’ Qdrant
===================================
Syncs ALL active listings from MongoDB to Qdrant with complete data including images.
This creates new points in Qdrant for listings that don't exist yet.

Usage:
    python sync_all_listings_to_qdrant.py
"""

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from structlog import get_logger
import os
from dotenv import load_dotenv
from uuid import UUID
import httpx

# Load environment variables
load_dotenv()

logger = get_logger(__name__)

# Configuration
MONGO_URI = os.getenv("MONGODB_URL")
if not MONGO_URI:
    raise ValueError("MONGODB_URL environment variable not set in .env file")

MONGO_DB = os.getenv("MONGODB_DATABASE", "lojiz")
QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")

if not QDRANT_URL or not QDRANT_API_KEY:
    raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in .env")

if not OPENROUTER_API_KEY:
    raise ValueError("OPENROUTER_API_KEY must be set in .env for embeddings")

COLLECTION_NAME = "listings"
EMBED_MODEL = "qwen/qwen3-embedding-8b"


def get_deterministic_uuid(mongo_id: str) -> str:
    """Convert MongoDB ObjectId to deterministic UUID for Qdrant."""
    import hashlib
    hash_bytes = hashlib.md5(mongo_id.encode()).digest()
    return str(UUID(bytes=hash_bytes))


async def embed_query(text: str) -> list:
    """Create embedding using OpenRouter."""
    async with httpx.AsyncClient(timeout=30) as client:
        payload = {
            "model": EMBED_MODEL,
            "input": text,
            "encoding_format": "float"
        }
        headers = {
            "Authorization": f"Bearer {OPENROUTER_API_KEY}",
            "Content-Type": "application/json",
        }
        
        response = await client.post(
            "https://openrouter.ai/api/v1/embeddings",
            json=payload,
            headers=headers
        )
        response.raise_for_status()
        
        data = response.json()
        return data["data"][0]["embedding"]


async def sync_all_listings():
    """Sync all active listings from MongoDB to Qdrant."""
    
    # Connect to MongoDB
    mongo_client = AsyncIOMotorClient(MONGO_URI)
    db = mongo_client[MONGO_DB]
    
    # Connect to Qdrant
    qdrant_client = QdrantClient(
        url=QDRANT_URL,
        api_key=QDRANT_API_KEY,
        timeout=60
    )
    
    try:
        # Fetch all active listings from MongoDB
        cursor = db.listings.find({"status": "active"})
        listings = await cursor.to_list(length=None)
        
        total = len(listings)
        logger.info(f"Found {total} active listings to sync")
        
        synced = 0
        skipped = 0
        errors = 0
        
        for idx, listing in enumerate(listings, 1):
            mongo_id = str(listing["_id"])
            point_id = get_deterministic_uuid(mongo_id)
            
            try:
                # Build text for embedding
                bedrooms = listing.get("bedrooms") or 0
                location = listing.get("location") or ""
                address = listing.get("address") or ""
                title = listing.get("title") or ""
                description = listing.get("description") or ""
                
                text = f"{title}. {bedrooms}-bed in {location} ({address}). {description}".strip()
                
                # Generate embedding
                print(f"[{idx}/{total}] Generating embedding for: {title[:50]}...")
                vector = await embed_query(text)
                
                # Prepare payload
                price_type = listing.get("price_type") or ""
                listing_type = listing.get("listing_type") or listing.get("type") or ""
                
                payload = {
                    "mongo_id": mongo_id,
                    "title": title,
                    "description": description,
                    "location": location,
                    "location_lower": location.lower() if location else "",
                    "address": address,
                    "price": float(listing.get("price") or 0),
                    "price_type": price_type,
                    "price_type_lower": price_type.lower() if price_type else "",
                    "listing_type": listing_type,
                    "listing_type_lower": listing_type.lower() if listing_type else "",
                    "bedrooms": int(bedrooms),
                    "bathrooms": int(listing.get("bathrooms") or 0),
                    "amenities": [a.lower() for a in (listing.get("amenities") or [])],
                    "currency": listing.get("currency", "XOF"),
                    "status": listing.get("status", "active"),
                    "latitude": listing.get("latitude"),
                    "longitude": listing.get("longitude"),
                    "images": listing.get("images", []),  # βœ… Include images
                }
                
                # Upsert point to Qdrant
                qdrant_client.upsert(
                    collection_name=COLLECTION_NAME,
                    points=[
                        PointStruct(
                            id=point_id,
                            vector=vector,
                            payload=payload
                        )
                    ]
                )
                
                synced += 1
                logger.info(
                    f"[{idx}/{total}] βœ… Synced",
                    mongo_id=mongo_id,
                    title=title[:30],
                    images_count=len(listing.get("images", []))
                )
                
            except Exception as e:
                errors += 1
                logger.error(
                    f"[{idx}/{total}] ❌ Failed",
                    mongo_id=mongo_id,
                    error=str(e)
                )
        
        # Summary
        print("\n" + "="*60)
        print("SYNC COMPLETE")
        print("="*60)
        print(f"Total listings:  {total}")
        print(f"βœ… Synced:       {synced}")
        print(f"⚠️  Skipped:      {skipped}")
        print(f"❌ Errors:       {errors}")
        print("="*60)
        
    finally:
        mongo_client.close()
        qdrant_client.close()


if __name__ == "__main__":
    print("Starting full sync from MongoDB to Qdrant...")
    print("This will create embeddings for all listings (may take a few minutes)...\n")
    asyncio.run(sync_all_listings())