import sys import os from pathlib import Path import requests import re import tempfile import json import math import time import warnings from typing import Dict, List from urllib3.exceptions import IncompleteRead from datetime import datetime import docling from docling.document_converter import DocumentConverter, PdfFormatOption from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend import pandas as pd import gradio as gr from pymongo import MongoClient, UpdateOne from pymongo.errors import ConnectionFailure, OperationFailure from data_helper import * from config import MONGODB_URI # Suppress PyTorch DataLoader pin_memory warning on MPS warnings.filterwarnings("ignore", message=".*pin_memory.*not supported on MPS.*") class MongoDBHandler: """Handler for MongoDB operations""" def __init__(self, connection_string: str = None, database_name: str = "product_database"): """ Initialize MongoDB connection Args: connection_string: MongoDB connection string (default: localhost) database_name: Name of the database to use """ if connection_string is None: connection_string = "mongodb://localhost:27017/" self.connection_string = connection_string self.database_name = database_name self.client = None self.db = None def connect(self): """Establish connection to MongoDB""" try: self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) # Test connection self.client.admin.command('ping') self.db = self.client[self.database_name] print(f"✅ Connected to MongoDB database: {self.database_name}") return True except ConnectionFailure as e: print(f"❌ Failed to connect to MongoDB: {e}") return False except Exception as e: print(f"❌ Unexpected error connecting to MongoDB: {e}") return False def disconnect(self): """Close MongoDB connection""" if self.client is not None: self.client.close() print("🔌 Disconnected from MongoDB") def get_collection_name(self, category: str) -> str: """Map category name to collection name""" collection_mapping = { "Sản phẩm nhà thông minh": "sp_nha_thong_minh", "Đèn LED": "sp_chieu_sang", "Chiếu sáng chuyên dụng": "sp_chuyen_dung", "Thiết bị điện": "sp_thiet_bi_dien", "Phích nước": "sp_phich_nuoc", } return collection_mapping.get(category, "unknown_products") def upload_data(self, data: List[Dict], collection_name: str, upsert: bool = True) -> Dict: """ Upload data to MongoDB collection Args: data: List of product dictionaries collection_name: Name of the collection upsert: If True, update existing documents or insert new ones Returns: Dictionary with upload statistics """ if self.db is None: return {"success": False, "error": "Not connected to database"} if not data: return {"success": False, "error": "No data to upload"} try: collection = self.db[collection_name] # Add metadata timestamp = datetime.utcnow() for item in data: item['_updated_at'] = timestamp if '_created_at' not in item: item['_created_at'] = timestamp if upsert: # Use bulk write with upsert for better performance operations = [] for item in data: product_id = item.get('Product_ID') if product_id: operations.append( UpdateOne( {'Product_ID': product_id}, {'$set': item}, upsert=True ) ) if operations: result = collection.bulk_write(operations) return { "success": True, "collection": collection_name, "inserted": result.upserted_count, "modified": result.modified_count, "matched": result.matched_count, "total": len(data) } else: return {"success": False, "error": "No valid product IDs found"} else: # Simple insert (may cause duplicates) result = collection.insert_many(data) return { "success": True, "collection": collection_name, "inserted": len(result.inserted_ids), "total": len(data) } except OperationFailure as e: return {"success": False, "error": f"MongoDB operation failed: {e}"} except Exception as e: return {"success": False, "error": f"Unexpected error: {e}"} def test_connection(self) -> str: """Test MongoDB connection and return status""" try: if self.connect(): # Get database stats stats = self.db.command("dbstats") collections = self.db.list_collection_names() self.disconnect() return f"✅ Connected successfully!\n📊 Database: {self.database_name}\n📁 Collections: {len(collections)}\n💾 Size: {stats.get('dataSize', 0) / 1024 / 1024:.2f} MB" else: return "❌ Connection failed" except Exception as e: return f"❌ Error: {str(e)}" class DataProcessing: def __init__(self): pass def get_data_from_excel_file(self, excel_path, key_match, collection_name, processor_type="docling", mongo_handler=None): """ Process Excel file and upload to MongoDB Args: excel_path: Path to Excel file key_match: Category to match collection_name: MongoDB collection name processor_type: Type of PDF processor mongo_handler: MongoDBHandler instance (required) """ if not mongo_handler: return "❌ MongoDB handler not provided" all_sheets = pd.read_excel(excel_path, sheet_name=None, header=1) sheet_names = list(all_sheets.keys()) sheets = {k: all_sheets[k] for k in sheet_names[2:]} data = [] for sheet_name, df in sheets.items(): df.columns = df.columns.str.strip() if "category 1" not in df.columns: df = pd.read_excel(excel_path, sheet_name=sheet_name, header=0) df.columns = df.columns.str.strip() if "category 1" in df.columns: filtered = df[df["category 1"].astype(str).str.replace("\n", " ").str.strip() == key_match] data.append(filtered) if data: result_df = pd.concat(data, ignore_index=True) result_df = result_df.where(pd.notnull(result_df), None) result_df["HDSD"] = None cols_to_drop = [col for col in result_df.columns if col.strip().lower().startswith("unnamed") or col.strip() == "a" or col == "STT"] result_df = result_df.drop(columns=cols_to_drop, errors='ignore') cols_to_replace = [col for col in result_df.columns if col not in ["Tóm tắt ưu điểm, tính năng", "Thông số kỹ thuật", "Nội dung Ưu điểm SP", "Ưu điểm"]] result_df[cols_to_replace] = result_df[cols_to_replace].replace('\n', ' ', regex=True) # Replace "none" values with None result_df.loc[result_df["Thông số kỹ thuật"] == "none", "Thông số kỹ thuật"] = None result_df.loc[result_df["Tóm tắt ưu điểm, tính năng"] == "none", "Tóm tắt ưu điểm, tính năng"] = None result_df.loc[result_df["Tóm tắt TSKT"] == "none", "Tóm tắt TSKT"] = None result_df.loc[result_df["Nội dung Ưu điểm SP"] == "none", "Nội dung Ưu điểm SP"] = None result_df = result_df.map(lambda x: x.strip() if isinstance(x, str) else x) result_df.drop_duplicates(subset=["Product_ID"], inplace=True) result_df = self.data_normalization(result_df=result_df) data = result_df.to_dict(orient="records") data = self.convert_floats(data) data = self.replace_nan_with_none(data) # Process instructions based on processor type if processor_type == "docling_with_ocr": data = self.process_instruction_with_tesseract(data) else: data = self.process_instruction(data) # Upload to MongoDB if not mongo_handler.connect(): return "❌ Failed to connect to MongoDB" result = mongo_handler.upload_data(data, collection_name, upsert=True) mongo_handler.disconnect() if result.get("success"): return f"✅ Uploaded to MongoDB collection '{result['collection']}':\n" \ f" • Total records: {result['total']}\n" \ f" • Inserted: {result.get('inserted', 0)}\n" \ f" • Updated: {result.get('modified', 0)}" else: return f"❌ MongoDB upload failed: {result.get('error', 'Unknown error')}" else: return f"❌ Data not found for key: {key_match}" def convert_floats(self, obj): if isinstance(obj, float) and obj.is_integer(): return int(obj) elif isinstance(obj, list): return [self.convert_floats(i) for i in obj] elif isinstance(obj, dict): return {k: self.convert_floats(v) for k, v in obj.items()} else: return obj def strip_redundant_space(self, text): cleaned_text = " ".join(text.strip().split()) return cleaned_text def convert_tag_to_dict(self, tag_str: str) -> dict: if not isinstance(tag_str, str) or not tag_str.strip().startswith("{"): return {} try: fixed = re.sub(r'([{,]\s*)(\w+)\s*:', r'\1"\2":', tag_str) raw_pairs = fixed.strip('{} ').split(',') raw_pairs = [pair.strip() for pair in raw_pairs if pair.strip()] result = {} current_key = None for pair in raw_pairs: if ':' in pair: key, value = pair.split(':', 1) key = key.strip().strip('"') value = value.strip() pattern = r',\s[A-Z]' match = re.search(pattern, value) if match: values = [v.strip() for v in value.split(',')] else: values = value result[key] = values current_key = key elif current_key: previous_value = result[current_key] if isinstance(previous_value, list): result[current_key].append(pair.strip()) else: result[current_key] = [previous_value, pair.strip()] return result except Exception as e: print(f"Error parse tag: {tag_str} -> {e}") return {} def convert_tags_to_numeric(self, tags_dict): keys_to_convert = ["dung_tich", "cong_suat", "lo_khoet_tran", "so_cuc", "so_hat", "modules", "cuon_day", "kich_thuoc"] new_tags = {} for key, value in tags_dict.items(): if key in keys_to_convert: match = re.search(r'([\d.]+)', str(value)) if match: num = float(match.group(1)) new_tags[key] = int(num) if num.is_integer() else num else: new_tags[key] = value else: new_tags[key] = value return new_tags def data_normalization(self, result_df): if "Tags" in result_df.columns: result_df["Tags"] = result_df["Tags"].astype(str).str.lower().apply(self.convert_tag_to_dict) result_df["Tags"] = result_df["Tags"].apply(self.convert_tags_to_numeric) if "Giá" in result_df.columns: result_df["Giá"] = result_df["Giá"].apply(lambda x: "Liên hệ" if x == 0 else x) if "Tên sản phẩm" in result_df.columns: result_df["Tên sản phẩm"] = result_df["Tên sản phẩm"].apply(self.strip_redundant_space) for col_name in result_df.columns: if col_name in ["Tóm tắt TSKT", "Thông số kỹ thuật"]: result_df[col_name] = result_df[col_name].astype(str).str.lower().str.strip() return result_df def replace_nan_with_none(self, obj): if isinstance(obj, float) and math.isnan(obj): return None elif isinstance(obj, dict): return {k: self.replace_nan_with_none(v) for k, v in obj.items()} elif isinstance(obj, list): return [self.replace_nan_with_none(i) for i in obj] else: return obj @staticmethod def download_pdf_with_retry(url, max_retries=3, timeout=30): """Download PDF with retry logic and better error handling""" for attempt in range(max_retries): try: print(f"Downloading PDF (attempt {attempt + 1}/{max_retries})...") session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) response = session.get(url, stream=True, timeout=timeout) response.raise_for_status() content_length = response.headers.get('content-length') if content_length: print(f"Expected file size: {int(content_length):,} bytes") content = b'' chunk_size = 8192 downloaded = 0 for chunk in response.iter_content(chunk_size=chunk_size): if chunk: content += chunk downloaded += len(chunk) print(f"\nDownload completed: {len(content):,} bytes") return content except (requests.exceptions.RequestException, IncompleteRead, ConnectionError) as e: print(f"Download attempt {attempt + 1} failed: {e}") if attempt < max_retries - 1: wait_time = 2 ** attempt print(f"Waiting {wait_time} seconds before retry...") time.sleep(wait_time) else: print("All download attempts failed") raise e @staticmethod def process_pdf_with_docling(url): """Process PDF from URL using Docling for better structure extraction""" try: pdf_content = DataProcessing.download_pdf_with_retry(url) with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: tmp_file.write(pdf_content) tmp_path = tmp_file.name print(f"PDF saved to temporary file: {tmp_path}") pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = False pipeline_options.do_table_structure = False converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options) } ) print("Converting document with Docling...") result = converter.convert(tmp_path) os.unlink(tmp_path) print("Temporary file cleaned up") return result except Exception as e: print(f"Error processing PDF with Docling from URL {url}: {e}") return None @staticmethod def extract_content_from_docling_result(docling_result): """Extract content from Docling result in a more robust way""" if not docling_result: return None try: doc = docling_result.document try: markdown_content = doc.export_to_markdown() return {'markdown': markdown_content} except Exception as e: print(f"Markdown export failed: {e}") if hasattr(doc, 'main_text'): return {'text': doc.main_text} if hasattr(doc, 'body') and doc.body: content = [] for element in doc.body: content.append(str(element)) return {'text': '\n'.join(content)} if hasattr(doc, 'elements') and doc.elements: content = [] for element in doc.elements: content.append(str(element)) return {'text': '\n'.join(content)} return {'error': 'No accessible content found'} except Exception as e: return {'error': f"Error extracting content: {e}"} def process_instruction(self, data): """Lấy thông tin hướng dẫn sử dụng""" tmp_data = data[:] for item in tmp_data: instruction_url = item.get("Link file HDSD", None) if not instruction_url: print("No instruction URL found, skipping...") item["HDSD"] = "" continue if "https://" not in instruction_url and "http://" not in instruction_url: print("Wrong URL, but has instruction info") item["HDSD"] = instruction_url continue if "hdsd" not in instruction_url or "Khong" in instruction_url: print("invalid instruction url/content") item["HDSD"] = "" continue raw_result = DataProcessing.process_pdf_with_docling(instruction_url) if raw_result: extract_result = DataProcessing.extract_content_from_docling_result(raw_result) if 'markdown' in extract_result.keys(): item["HDSD"] = re.sub(r"", '', extract_result['markdown'], flags=re.IGNORECASE).strip() elif 'text' in extract_result.keys(): item["HDSD"] = re.sub(r"", '', extract_result['text'], flags=re.IGNORECASE).strip() return tmp_data def process_single_category(excel_path, category_name, processor_type, mongo_connection, mongo_database, progress=gr.Progress()): """Process a single product category and upload to MongoDB""" if excel_path is None: return "❌ Please upload an Excel file first" # Category mapping category_mapping = { "Sản phẩm nhà thông minh": ("Sản phẩm nhà thông minh", "sp_nha_thong_minh"), "Đèn LED": ("Đèn LED", "sp_chieu_sang"), "Chiếu sáng chuyên dụng": ("Chiếu sáng chuyên dụng", "sp_chuyen_dung"), "Thiết bị điện": ("Thiết bị điện", "sp_thiet_bi_dien"), "Phích nước": ("Phích nước", "sp_phich_nuoc"), } if category_name not in category_mapping: return f"❌ Unknown category: {category_name}" key_match, collection_name = category_mapping[category_name] try: progress(0.1, desc="Initializing data processor...") dp = DataProcessing() # Initialize MongoDB handler mongo_handler = MongoDBHandler( connection_string=mongo_connection if mongo_connection else None, database_name=mongo_database if mongo_database else "product_database" ) progress(0.3, desc=f"Processing {category_name} with {processor_type}...") result = dp.get_data_from_excel_file( excel_path=excel_path, key_match=key_match, collection_name=collection_name, processor_type=processor_type, mongo_handler=mongo_handler ) progress(1.0, desc="Processing completed!") return result except Exception as e: return f"❌ Error processing {category_name}: {str(e)}" def process_all_categories(excel_path, processor_type, mongo_connection, mongo_database, progress=gr.Progress()): """Process all product categories and upload to MongoDB""" if excel_path is None: return "❌ Please upload an Excel file first" categories = [ "Sản phẩm nhà thông minh", "Đèn LED", "Chiếu sáng chuyên dụng", "Thiết bị điện", "Phích nước" ] results = [] total_categories = len(categories) for i, category in enumerate(categories): progress((i + 1) / total_categories, desc=f"Processing {category}...") result = process_single_category( excel_path, category, processor_type, mongo_connection, mongo_database ) results.append(f"{category}: {result}") return "\n".join(results) def test_mongo_connection(connection_string, database_name): """Test MongoDB connection""" if not connection_string: connection_string = "mongodb://localhost:27017/" if not database_name: database_name = "product_database" handler = MongoDBHandler(connection_string, database_name) return handler.test_connection() def create_processing_interface(): """Create Gradio interface with MongoDB-only storage""" with gr.Blocks(title="Data Processing - Product Metadata Extractor") as demo: gr.Markdown("# 📊 Product Data Processing") gr.Markdown("Extract and process product metadata from Excel files and upload to MongoDB") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📤 Upload Excel File") excel_upload = gr.File( label="Upload Excel File", file_types=[".xlsx", ".xls"], type="filepath" ) gr.Markdown("### ⚙️ Processing Settings") processor_dropdown = gr.Dropdown( choices=["docling"], value="docling", label="PDF Processor Type", info="Using basic docling for fast processing" ) category_dropdown = gr.Dropdown( choices=[ "Sản phẩm nhà thông minh", "Đèn LED", "Chiếu sáng chuyên dụng", "Thiết bị điện", "Phích nước" ], value="Sản phẩm nhà thông minh", label="Product Category", info="Select which product category to process" ) gr.Markdown("### 🗄️ MongoDB Configuration") mongo_connection = gr.Textbox( label="MongoDB Connection String", placeholder="mongodb+srv://:@cluster.mongodb.net/?retryWrites=true&w=majority", value=MONGODB_URI, info="MongoDB connection string" ) mongo_database = gr.Textbox( label="Database Name", placeholder="product_database", value="product_database", info="Name of the MongoDB database" ) test_connection_btn = gr.Button("🔌 Test Connection", size="sm") connection_status = gr.Textbox( label="Connection Status", interactive=False, lines=3 ) with gr.Column(scale=2): output_box = gr.Textbox( lines=15, label="📋 Processing Log", placeholder="Processing results will appear here..." ) gr.Markdown("### 🚀 Actions") with gr.Row(): process_single_btn = gr.Button("🔄 Process Selected Category", variant="primary") process_all_btn = gr.Button("🔄 Process All Categories", variant="secondary") gr.Markdown("### 📖 Information") with gr.Accordion("MongoDB Collections", open=False): gr.Markdown(""" **📦 Collections**: - `sp_nha_thong_minh` - Sản phẩm nhà thông minh - `sp_chieu_sang` - Đèn LED - `sp_chuyen_dung` - Chiếu sáng chuyên dụng - `sp_thiet_bi_dien` - Thiết bị điện - `sp_phich_nuoc` - Phích nước **🔄 Upsert Logic**: - Existing records are updated based on `Product_ID` - New records are inserted automatically - Timestamps `_created_at` and `_updated_at` are managed automatically """) with gr.Accordion("Processor Types", open=False): gr.Markdown(""" **🔹 docling**: Basic PDF text extraction - Fast processing - Good for text-based PDFs - No OCR capabilities """) # Event handlers test_connection_btn.click( fn=test_mongo_connection, inputs=[mongo_connection, mongo_database], outputs=[connection_status] ) process_single_btn.click( fn=process_single_category, inputs=[ excel_upload, category_dropdown, processor_dropdown, mongo_connection, mongo_database ], outputs=output_box, show_progress=True ) process_all_btn.click( fn=process_all_categories, inputs=[ excel_upload, processor_dropdown, mongo_connection, mongo_database ], outputs=output_box, show_progress=True ) return demo if __name__ == "__main__": demo = create_processing_interface() demo.launch(share=False, server_name="localhost", server_port=7860)