Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| from flask import Flask, render_template, request, jsonify | |
| from functools import lru_cache | |
| import hashlib | |
| import sqlite3 | |
| import datetime | |
| import json | |
| from models.claim_extractor import ClaimExtractor | |
| from models.keyword_extractor import KeywordExtractor | |
| from models.evidence_retriever import EvidenceRetriever | |
| from models.nli_classifier import NLIClassifier | |
| from utils.similarity import calculate_similarity | |
| from utils.config import Config | |
| # Initialize models globally | |
| claim_extractor = ClaimExtractor() | |
| keyword_extractor = KeywordExtractor() | |
| evidence_retriever = EvidenceRetriever() | |
| nli_classifier = NLIClassifier() | |
| class TruthCheckSystem: | |
| def __init__(self): | |
| self.claim_extractor = claim_extractor | |
| self.keyword_extractor = keyword_extractor | |
| self.evidence_retriever = evidence_retriever | |
| self.nli_classifier = nli_classifier | |
| self.cache = {} | |
| def _get_cache_key(self, text): | |
| """Generate cache key for claim""" | |
| return hashlib.md5(text.encode()).hexdigest() | |
| def verify_claim(self, text): | |
| """ | |
| Enhanced fact verification with multi-evidence aggregation | |
| and consensus mechanism (similar to FactCheck system) | |
| """ | |
| try: | |
| # Check cache | |
| cache_key = self._get_cache_key(text) | |
| if cache_key in self.cache: | |
| print("Returning cached result") | |
| return self.cache[cache_key] | |
| # Step 1: Extract claims | |
| claims = self.claim_extractor.extract_claims(text) | |
| if not claims: | |
| result = ("Low Confidence", 0.3, "No valid claims found. Please provide a clear factual statement.") | |
| self.cache[cache_key] = result | |
| return result | |
| claim = claims[0] | |
| # Step 2: Extract keywords | |
| keywords = self.keyword_extractor.extract_keywords(claim) | |
| # Step 3: Retrieve evidence from multiple sources | |
| evidence_items = self.evidence_retriever.get_evidence(keywords) | |
| if not evidence_items: | |
| result = ("Low Confidence", 0.3, "Not enough reliable evidence found.") | |
| self.cache[cache_key] = result | |
| return result | |
| # Step 4: Filter by semantic similarity | |
| relevant_evidence = [] | |
| for item in evidence_items: | |
| similarity = calculate_similarity(claim, item['content']) | |
| if similarity > Config.SIMILARITY_THRESHOLD: | |
| item['similarity_score'] = similarity | |
| relevant_evidence.append(item) | |
| if not relevant_evidence: | |
| result = ("Low Confidence", 0.4, "No semantically relevant evidence found.") | |
| self.cache[cache_key] = result | |
| return result | |
| # Step 5: Sort by combined score (credibility + similarity) | |
| for item in relevant_evidence: | |
| item['combined_score'] = ( | |
| item.get('credibility_score', 0.5) * 0.6 + | |
| item.get('similarity_score', 0.5) * 0.4 | |
| ) | |
| relevant_evidence.sort(key=lambda x: x['combined_score'], reverse=True) | |
| # Step 6: Multi-Evidence NLI with Consensus Mechanism | |
| # Use top 4 evidence sources (as per FactCheck research) | |
| top_evidence = relevant_evidence[:4] | |
| nli_results = [] | |
| for evidence_item in top_evidence: | |
| nli_result = self.nli_classifier.classify(claim, evidence_item['content']) | |
| nli_results.append({ | |
| 'nli': nli_result, | |
| 'credibility': evidence_item.get('credibility_score', 0.5), | |
| 'similarity': evidence_item.get('similarity_score', 0.5), | |
| 'source': evidence_item.get('source', 'Unknown'), | |
| 'url': evidence_item.get('url', '') | |
| }) | |
| # Step 7: Weighted Consensus Voting | |
| entailment_score = 0 | |
| contradiction_score = 0 | |
| neutral_score = 0 | |
| total_weight = 0 | |
| for result in nli_results: | |
| # Weight by credibility and confidence | |
| weight = result['credibility'] * result['nli']['confidence'] | |
| total_weight += weight | |
| if result['nli']['label'] == 'ENTAILMENT': | |
| entailment_score += weight | |
| elif result['nli']['label'] == 'CONTRADICTION': | |
| contradiction_score += weight | |
| else: | |
| neutral_score += weight | |
| # Normalize scores | |
| if total_weight > 0: | |
| entailment_score /= total_weight | |
| contradiction_score /= total_weight | |
| neutral_score /= total_weight | |
| # Step 8: Determine final label with consensus threshold | |
| consensus_threshold = 0.6 # Require 60% agreement | |
| max_score = max(entailment_score, contradiction_score, neutral_score) | |
| if max_score == entailment_score and entailment_score >= consensus_threshold: | |
| label = "True" | |
| final_confidence = entailment_score | |
| elif max_score == contradiction_score and contradiction_score >= consensus_threshold: | |
| label = "False" | |
| final_confidence = contradiction_score | |
| else: | |
| label = "Low Confidence" | |
| final_confidence = max(entailment_score, contradiction_score, neutral_score) | |
| # Step 9: Prepare evidence summary | |
| evidence_summary = self._format_evidence_summary(nli_results, top_evidence) | |
| result = (label, final_confidence, evidence_summary) | |
| # Cache result | |
| self.cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| print(f"Error during claim verification: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return ("Error", 0.0, f"An internal error occurred: {str(e)}") | |
| def _format_evidence_summary(self, nli_results, evidence_items): | |
| """Format evidence summary with sources and verdicts""" | |
| summary_parts = [] | |
| summary_parts.append(f"**Analyzed {len(nli_results)} sources:**\n") | |
| for i, (nli_res, evidence) in enumerate(zip(nli_results, evidence_items), 1): | |
| source = nli_res['source'] | |
| verdict = nli_res['nli']['label'] | |
| confidence = nli_res['nli']['confidence'] | |
| credibility = nli_res['credibility'] | |
| url = nli_res['url'] | |
| # Get snippet | |
| content = evidence.get('content', '')[:300] | |
| summary_parts.append( | |
| f"\n**Source {i}: {source}**\n" | |
| f"Verdict: {verdict} (Confidence: {confidence:.2%})\n" | |
| f"Credibility Score: {credibility:.2f}\n" | |
| f"Excerpt: {content}...\n" | |
| f"URL: {url}\n" | |
| ) | |
| return "\n".join(summary_parts) | |
| # Initialize system | |
| truthcheck_system_instance = TruthCheckSystem() | |
| DB_PATH = os.path.join(os.getcwd(), 'history.db') | |
| def init_db(): | |
| """Initialize SQLite database""" | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute(''' | |
| CREATE TABLE IF NOT EXISTS verifications ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| claim TEXT NOT NULL, | |
| label TEXT NOT NULL, | |
| confidence REAL, | |
| date TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| init_db() | |
| def create_app(): | |
| app = Flask(__name__, static_folder='static', template_folder='templates') | |
| app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', Config.SECRET_KEY) | |
| app.config['DEBUG'] = Config.DEBUG | |
| def index(): | |
| return render_template('index.html') | |
| def how_it_works(): | |
| return render_template('how_it_works.html') | |
| def api_docs(): | |
| return render_template('api.html') | |
| def dashboard(): | |
| return render_template('dashboard.html') | |
| def get_history(): | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| c = conn.cursor() | |
| c.execute('SELECT * FROM verifications ORDER BY date DESC LIMIT 50') | |
| rows = c.fetchall() | |
| conn.close() | |
| history = [] | |
| for row in rows: | |
| history.append({ | |
| 'id': row['id'], | |
| 'claim': row['claim'], | |
| 'label': row['label'], | |
| 'confidence': row['confidence'], | |
| 'date': row['date'] | |
| }) | |
| return jsonify(history) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def verify_claim_api(): | |
| try: | |
| data = request.get_json() | |
| claim_text = data.get('claim', '') | |
| if not claim_text: | |
| return jsonify({'error': 'No claim provided'}), 400 | |
| label, confidence, evidence = truthcheck_system_instance.verify_claim(claim_text) | |
| # Save to DB | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| c = conn.cursor() | |
| c.execute('INSERT INTO verifications (claim, label, confidence) VALUES (?, ?, ?)', | |
| (claim_text, label, float(confidence))) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"DB Error: {e}") | |
| result = { | |
| 'label': label, | |
| 'confidence': round(confidence, 3), | |
| 'evidence': evidence, | |
| 'claim': claim_text | |
| } | |
| return jsonify(result) | |
| except Exception as e: | |
| print(f"API error: {e}") | |
| return jsonify({'error': f'Server error: {str(e)}'}), 500 | |
| def health_check(): | |
| return jsonify({'status': 'healthy', 'message': 'TruthCheck is running.'}) | |
| return app | |