Spaces:
Sleeping
Sleeping
| import httpx | |
| from fastapi import FastAPI | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from pydantic import BaseModel | |
| from enum import Enum | |
| from transformers import pipeline | |
| from phishing_datasets import submit_entry | |
| from url_tools import extract_urls, resolve_short_url | |
| from urlscan_client import UrlscanClient | |
| import requests | |
| app = FastAPI() | |
| urlscan = UrlscanClient() | |
| class MessageModel(BaseModel): | |
| text: str | |
| class QueryModel(BaseModel): | |
| sender: str | |
| message: MessageModel | |
| class AppModel(BaseModel): | |
| version: str | |
| class InputModel(BaseModel): | |
| _version: int | |
| query: QueryModel | |
| app: AppModel | |
| class ActionModel(Enum): | |
| # Insufficient information to determine an action to take. In a query response, has the effect of allowing the message to be shown normally. | |
| NONE = 0 | |
| # Allow the message to be shown normally. | |
| ALLOW = 1 | |
| # Prevent the message from being shown normally, filtered as Junk message. | |
| JUNK = 2 | |
| # Prevent the message from being shown normally, filtered as Promotional message. | |
| PROMOTION = 3 | |
| # Prevent the message from being shown normally, filtered as Transactional message. | |
| TRANSACTION = 4 | |
| class SubActionModel(Enum): | |
| NONE = 0 | |
| class OutputModel(BaseModel): | |
| action: ActionModel | |
| sub_action: SubActionModel | |
| pipe = pipeline(task="text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection") | |
| def get_well_known_aasa(): | |
| return JSONResponse( | |
| content={ | |
| "messagefilter": { | |
| "apps": [ | |
| "X9NN3FSS3T.com.lela.Serenity.SerenityMessageFilterExtension", | |
| "X9NN3FSS3T.com.lela.Serenity" | |
| ] | |
| } | |
| }, | |
| media_type="application/json" | |
| ) | |
| def get_robot_txt(): | |
| return FileResponse("robot.txt") | |
| def predict(model: InputModel) -> OutputModel: | |
| text = model.query.message.text | |
| urls = extract_urls(text) | |
| results = [urlscan.scan(url) for url in urls] | |
| for result in results: | |
| overall = result.get('verdicts', {}).get('overall', {}) | |
| print(f"Checking verdict: {overall}") | |
| if overall.get('hasVerdicts') and overall.get('score') > 0: | |
| print("Match found. Submitting entry and returning JUNK.") | |
| submit_entry(model.query.sender, model.query.message.text) | |
| return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
| label = pipe(text) | |
| if label[0]['label'] == 'LABEL_1': | |
| submit_entry(model.query.sender, model.query.message.text) | |
| return OutputModel(action=ActionModel.JUNK, sub_action=SubActionModel.NONE) | |
| else: | |
| return OutputModel(action=ActionModel.NONE, sub_action=SubActionModel.NONE) | |
| class ReportModel(BaseModel): | |
| sender: str | |
| message: str | |
| def report(model: ReportModel): | |
| submit_entry(model.sender, model.message) |