Spaces:
Sleeping
Sleeping
| """ | |
| UrlscanClient: A simple Python client for interacting with the urlscan.io API. | |
| This client allows you to: | |
| - Submit a URL to be scanned. | |
| - Retrieve scan results by UUID. | |
| - Search existing scans with a query. | |
| - Perform a full scan workflow with error handling. | |
| Environment Variable: | |
| URLSCAN_API_KEY (str): If not passed during initialization, the client will attempt to use this environment variable. | |
| Dependencies: | |
| - requests | |
| - os | |
| - time | |
| """ | |
| import os | |
| import time | |
| import requests | |
| class UrlscanClient: | |
| """ | |
| A client to interact with the urlscan.io API for submitting URLs, retrieving scan results, | |
| and searching the scan database. | |
| """ | |
| BASE_URL = "https://urlscan.io/api/v1" | |
| def __init__(self, api_key=None): | |
| """ | |
| Initialize the UrlscanClient. | |
| Parameters: | |
| api_key (str, optional): Your urlscan.io API key. If not provided, it is read from | |
| the URLSCAN_API_KEY environment variable. | |
| Raises: | |
| ValueError: If the API key is not provided or found in environment variables. | |
| """ | |
| self.api_key = api_key or os.getenv("URLSCAN_API_KEY") | |
| if not self.api_key: | |
| raise ValueError("API key is required. Set it via parameter or the URLSCAN_API_KEY environment variable.") | |
| self.headers = { | |
| "API-Key": self.api_key, | |
| "Content-Type": "application/json" | |
| } | |
| def submit_url(self, url, visibility="public", tags=None, **options): | |
| """ | |
| Submit a URL for scanning. | |
| Parameters: | |
| url (str): The URL to scan. | |
| visibility (str): Scan visibility ('public', 'unlisted', or 'private'). Defaults to 'public'. | |
| tags (list, optional): Optional list of tags to associate with the scan. | |
| **options: Additional scan options like 'useragent', 'referer', 'country', etc. | |
| Returns: | |
| dict: JSON response from the submission API. | |
| Raises: | |
| requests.HTTPError: If the request fails. | |
| """ | |
| payload = { | |
| "url": url, | |
| "visibility": visibility, | |
| "country": "fr", | |
| "tags": tags or [] | |
| } | |
| payload.update(options) | |
| response = requests.post(f"{self.BASE_URL}/scan/", headers=self.headers, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_result(self, uuid, wait=True, timeout=60): | |
| """ | |
| Retrieve the result of a scan by UUID. | |
| Parameters: | |
| uuid (str): The UUID of the scan result. | |
| wait (bool): Whether to wait for the scan to complete if it's not yet ready. Defaults to True. | |
| timeout (int): Maximum time (in seconds) to wait for the result if wait is True. Defaults to 60. | |
| Returns: | |
| dict: The scan result data. | |
| Raises: | |
| TimeoutError: If the result is not ready within the timeout period. | |
| requests.HTTPError: If another HTTP error occurs. | |
| """ | |
| result_url = f"{self.BASE_URL}/result/{uuid}/" | |
| start_time = time.time() | |
| while True: | |
| response = requests.get(result_url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| elif response.status_code == 404: | |
| if not wait or (time.time() - start_time) > timeout: | |
| raise TimeoutError("Scan result not available yet.") | |
| time.sleep(5) | |
| else: | |
| response.raise_for_status() | |
| def search(self, query, size=10): | |
| """ | |
| Search for past scans using a query string. | |
| Parameters: | |
| query (str): The search query, such as a domain name or IP address. | |
| size (int): Maximum number of results to return. Defaults to 10. | |
| Returns: | |
| dict: Search results from urlscan.io. | |
| Raises: | |
| requests.HTTPError: If the request fails. | |
| """ | |
| params = {"q": query, "size": size} | |
| response = requests.get(f"{self.BASE_URL}/search/", headers=self.headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| def scan(self, url: str): | |
| """ | |
| Convenience method to submit a scan and retrieve the result. | |
| Parameters: | |
| url (str): The URL to scan. | |
| Returns: | |
| dict: The scan result, or a fallback result if the scan fails. | |
| """ | |
| try: | |
| print(f"Submit url {url}") | |
| submission = self.submit_url(url=url, visibility="public") | |
| print(f"Submitted scan. UUID: {submission['uuid']}") | |
| result = self.get_result(submission["uuid"]) | |
| print(f"Submission succeed. UUID: {submission['uuid']}") | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| print(f"Submission failed {e}") | |
| return { | |
| 'page': {'url': url}, | |
| 'verdicts': {'overall': {'hasVerdicts': False, 'score': 0}} | |
| } |