TruthCheck-AI / app.py
CHRISDANIEL145
Fix: Add dashboard reload feedback and absolute DB path
47b16cf
# app.py
import os
from flask import Flask, render_template, request, jsonify
from functools import lru_cache
import hashlib
import sqlite3
import datetime
import json
from models.claim_extractor import ClaimExtractor
from models.keyword_extractor import KeywordExtractor
from models.evidence_retriever import EvidenceRetriever
from models.nli_classifier import NLIClassifier
from utils.similarity import calculate_similarity
from utils.config import Config
# Initialize models globally
claim_extractor = ClaimExtractor()
keyword_extractor = KeywordExtractor()
evidence_retriever = EvidenceRetriever()
nli_classifier = NLIClassifier()
class TruthCheckSystem:
def __init__(self):
self.claim_extractor = claim_extractor
self.keyword_extractor = keyword_extractor
self.evidence_retriever = evidence_retriever
self.nli_classifier = nli_classifier
self.cache = {}
def _get_cache_key(self, text):
"""Generate cache key for claim"""
return hashlib.md5(text.encode()).hexdigest()
def verify_claim(self, text):
"""
Enhanced fact verification with multi-evidence aggregation
and consensus mechanism (similar to FactCheck system)
"""
try:
# Check cache
cache_key = self._get_cache_key(text)
if cache_key in self.cache:
print("Returning cached result")
return self.cache[cache_key]
# Step 1: Extract claims
claims = self.claim_extractor.extract_claims(text)
if not claims:
result = ("Low Confidence", 0.3, "No valid claims found. Please provide a clear factual statement.")
self.cache[cache_key] = result
return result
claim = claims[0]
# Step 2: Extract keywords
keywords = self.keyword_extractor.extract_keywords(claim)
# Step 3: Retrieve evidence from multiple sources
evidence_items = self.evidence_retriever.get_evidence(keywords)
if not evidence_items:
result = ("Low Confidence", 0.3, "Not enough reliable evidence found.")
self.cache[cache_key] = result
return result
# Step 4: Filter by semantic similarity
relevant_evidence = []
for item in evidence_items:
similarity = calculate_similarity(claim, item['content'])
if similarity > Config.SIMILARITY_THRESHOLD:
item['similarity_score'] = similarity
relevant_evidence.append(item)
if not relevant_evidence:
result = ("Low Confidence", 0.4, "No semantically relevant evidence found.")
self.cache[cache_key] = result
return result
# Step 5: Sort by combined score (credibility + similarity)
for item in relevant_evidence:
item['combined_score'] = (
item.get('credibility_score', 0.5) * 0.6 +
item.get('similarity_score', 0.5) * 0.4
)
relevant_evidence.sort(key=lambda x: x['combined_score'], reverse=True)
# Step 6: Multi-Evidence NLI with Consensus Mechanism
# Use top 4 evidence sources (as per FactCheck research)
top_evidence = relevant_evidence[:4]
nli_results = []
for evidence_item in top_evidence:
nli_result = self.nli_classifier.classify(claim, evidence_item['content'])
nli_results.append({
'nli': nli_result,
'credibility': evidence_item.get('credibility_score', 0.5),
'similarity': evidence_item.get('similarity_score', 0.5),
'source': evidence_item.get('source', 'Unknown'),
'url': evidence_item.get('url', '')
})
# Step 7: Weighted Consensus Voting
entailment_score = 0
contradiction_score = 0
neutral_score = 0
total_weight = 0
for result in nli_results:
# Weight by credibility and confidence
weight = result['credibility'] * result['nli']['confidence']
total_weight += weight
if result['nli']['label'] == 'ENTAILMENT':
entailment_score += weight
elif result['nli']['label'] == 'CONTRADICTION':
contradiction_score += weight
else:
neutral_score += weight
# Normalize scores
if total_weight > 0:
entailment_score /= total_weight
contradiction_score /= total_weight
neutral_score /= total_weight
# Step 8: Determine final label with consensus threshold
consensus_threshold = 0.6 # Require 60% agreement
max_score = max(entailment_score, contradiction_score, neutral_score)
if max_score == entailment_score and entailment_score >= consensus_threshold:
label = "True"
final_confidence = entailment_score
elif max_score == contradiction_score and contradiction_score >= consensus_threshold:
label = "False"
final_confidence = contradiction_score
else:
label = "Low Confidence"
final_confidence = max(entailment_score, contradiction_score, neutral_score)
# Step 9: Prepare evidence summary
evidence_summary = self._format_evidence_summary(nli_results, top_evidence)
result = (label, final_confidence, evidence_summary)
# Cache result
self.cache[cache_key] = result
return result
except Exception as e:
print(f"Error during claim verification: {e}")
import traceback
traceback.print_exc()
return ("Error", 0.0, f"An internal error occurred: {str(e)}")
def _format_evidence_summary(self, nli_results, evidence_items):
"""Format evidence summary with sources and verdicts"""
summary_parts = []
summary_parts.append(f"**Analyzed {len(nli_results)} sources:**\n")
for i, (nli_res, evidence) in enumerate(zip(nli_results, evidence_items), 1):
source = nli_res['source']
verdict = nli_res['nli']['label']
confidence = nli_res['nli']['confidence']
credibility = nli_res['credibility']
url = nli_res['url']
# Get snippet
content = evidence.get('content', '')[:300]
summary_parts.append(
f"\n**Source {i}: {source}**\n"
f"Verdict: {verdict} (Confidence: {confidence:.2%})\n"
f"Credibility Score: {credibility:.2f}\n"
f"Excerpt: {content}...\n"
f"URL: {url}\n"
)
return "\n".join(summary_parts)
# Initialize system
truthcheck_system_instance = TruthCheckSystem()
DB_PATH = os.path.join(os.getcwd(), 'history.db')
def init_db():
"""Initialize SQLite database"""
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS verifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
claim TEXT NOT NULL,
label TEXT NOT NULL,
confidence REAL,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
init_db()
def create_app():
app = Flask(__name__, static_folder='static', template_folder='templates')
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', Config.SECRET_KEY)
app.config['DEBUG'] = Config.DEBUG
@app.route('/')
def index():
return render_template('index.html')
@app.route('/how-it-works')
def how_it_works():
return render_template('how_it_works.html')
@app.route('/api-docs')
def api_docs():
return render_template('api.html')
@app.route('/dashboard')
def dashboard():
return render_template('dashboard.html')
@app.route('/api/history')
def get_history():
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('SELECT * FROM verifications ORDER BY date DESC LIMIT 50')
rows = c.fetchall()
conn.close()
history = []
for row in rows:
history.append({
'id': row['id'],
'claim': row['claim'],
'label': row['label'],
'confidence': row['confidence'],
'date': row['date']
})
return jsonify(history)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/verify', methods=['POST'])
def verify_claim_api():
try:
data = request.get_json()
claim_text = data.get('claim', '')
if not claim_text:
return jsonify({'error': 'No claim provided'}), 400
label, confidence, evidence = truthcheck_system_instance.verify_claim(claim_text)
# Save to DB
try:
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('INSERT INTO verifications (claim, label, confidence) VALUES (?, ?, ?)',
(claim_text, label, float(confidence)))
conn.commit()
conn.close()
except Exception as e:
print(f"DB Error: {e}")
result = {
'label': label,
'confidence': round(confidence, 3),
'evidence': evidence,
'claim': claim_text
}
return jsonify(result)
except Exception as e:
print(f"API error: {e}")
return jsonify({'error': f'Server error: {str(e)}'}), 500
@app.route('/health')
def health_check():
return jsonify({'status': 'healthy', 'message': 'TruthCheck is running.'})
return app