import os os.environ['KMP_DUPLICATE_LIB_OK']='True' import tempfile import mimetypes import gradio as gr import torch import stable_whisper from stable_whisper.text_output import result_to_any, sec2srt import time from yt_dlp import YoutubeDL import csv import os import subprocess import glob import shutil def process_media( model_size, source_lang, upload, model_type, max_chars, max_words, extend_in, extend_out, collapse_gaps, max_lines_per_segment, line_penalty, longest_line_char_penalty, initial_prompt=None, *args ): if not initial_prompt: initial_prompt = None start_time = time.time() if upload is None: return None, None, None, None temp_path = upload.name if model_type == "faster whisper": device = "cuda" if torch.cuda.is_available() else "cpu" model = stable_whisper.load_faster_whisper(model_size, device=device) result = model.transcribe( temp_path, language=source_lang, vad=True, regroup=False, #denoiser="demucs", #batch_size=16, initial_prompt=initial_prompt ) else: device = "cuda" if torch.cuda.is_available() else "cpu" model = stable_whisper.load_model(model_size, device=device) result = model.transcribe( temp_path, language=source_lang, vad=True, regroup=False, no_speech_threshold=0.9, denoiser="demucs", initial_prompt=initial_prompt ) # ADVANCED SETTINGS # if max_chars or max_words: result.split_by_length( max_chars=int(max_chars) if max_chars else None, max_words=int(max_words) if max_words else None ) # ----- Anti-flickering ----- # extend_start = float(extend_in) if extend_in else 0.0 extend_end = float(extend_out) if extend_out else 0.0 collapse_gaps_under = float(collapse_gaps) if collapse_gaps else 0.0 for i in range(len(result) - 1): cur = result[i] next = result[i+1] if next.start - cur.end < extend_start + extend_end: k = extend_end / (extend_start + extend_end) if (extend_start + extend_end) > 0 else 0 mid = cur.end * (1 - k) + next.start * k cur.end = next.start = mid else: cur.end += extend_end next.start -= extend_start if next.start - cur.end <= collapse_gaps_under: cur.end = next.start = (cur.end + next.start) / 2 if result: result[0].start = max(0, result[0].start - extend_start) result[-1].end += extend_end # --- Custom SRT block output --- # original_filename = os.path.splitext(os.path.basename(temp_path))[0] srt_dir = tempfile.gettempdir() subtitles_path = os.path.join(srt_dir, f"{original_filename}.srt") result_to_any( result=result, filepath=subtitles_path, filetype='srt', segments2blocks=lambda segments: segments2blocks( segments, int(max_lines_per_segment) if max_lines_per_segment else 3, float(line_penalty) if line_penalty else 22.01, float(longest_line_char_penalty) if longest_line_char_penalty else 1.0 ), word_level=False, ) srt_file_path = subtitles_path transcript_txt = result.to_txt() mime, _ = mimetypes.guess_type(temp_path) audio_out = temp_path if mime and mime.startswith("audio") else None video_out = temp_path if mime and mime.startswith("video") else None return audio_out, video_out, transcript_txt, srt_file_path def optimize_text(text, max_lines_per_segment, line_penalty, longest_line_char_penalty): text = text.strip() words = text.split() psum = [0] for w in words: psum += [psum[-1] + len(w) + 1] bestScore = 10 ** 30 bestSplit = None def backtrack(level, wordsUsed, maxLineLength, split): nonlocal bestScore, bestSplit if wordsUsed == len(words): score = level * line_penalty + maxLineLength * longest_line_char_penalty if score < bestScore: bestScore = score bestSplit = split return if level + 1 == max_lines_per_segment: backtrack( level + 1, len(words), max(maxLineLength, psum[len(words)] - psum[wordsUsed] - 1), split + [words[wordsUsed:]] ) return for levelWords in range(1, len(words) - wordsUsed + 1): backtrack( level + 1, wordsUsed + levelWords, max(maxLineLength, psum[wordsUsed + levelWords] - psum[wordsUsed] - 1), split + [words[wordsUsed:wordsUsed + levelWords]] ) backtrack(0, 0, 0, []) if not bestSplit: return text if len(bestSplit) > max_lines_per_segment or any(len(line) == 1 for line in bestSplit): return text optimized = '\n'.join(' '.join(words) for words in bestSplit) return optimized def segment2optimizedsrtblock(segment: dict, idx: int, max_lines_per_segment, line_penalty, longest_line_char_penalty, strip=True) -> str: return f'{idx}\n{sec2srt(segment["start"])} --> {sec2srt(segment["end"])}\n' \ f'{optimize_text(segment["text"], max_lines_per_segment, line_penalty, longest_line_char_penalty)}' def segments2blocks(segments, max_lines_per_segment, line_penalty, longest_line_char_penalty): return '\n\n'.join( segment2optimizedsrtblock(s, i, max_lines_per_segment, line_penalty, longest_line_char_penalty, strip=True) for i, s in enumerate(segments) ) def extract_playlist_to_csv(playlist_url, cookies_path=None): ydl_opts = { 'extract_flat': True, 'quiet': True, 'dump_single_json': True } try: cookies_path = _normalize_file_path(cookies_path) if cookies_path: ydl_opts['cookies'] = cookies_path with YoutubeDL(ydl_opts) as ydl: result = ydl.extract_info(playlist_url, download=False) entries = result.get('entries', []) fd, csv_path = tempfile.mkstemp(suffix=".csv", text=True) os.close(fd) with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['Title', 'Video ID', 'URL']) for video in entries: title = video.get('title', 'N/A') video_id = video['id'] url = f'https://www.youtube.com/watch?v={video_id}' writer.writerow([title, video_id, url]) return csv_path except Exception as e: return None def download_srt(video_urls, cookies_path=None): try: if not video_urls: return None, "No URL provided" if isinstance(video_urls, (list, tuple)): urls = [u.strip() for u in video_urls if u and u.strip()] else: parts = [] for line in str(video_urls).splitlines(): for part in line.split(','): parts.append(part.strip()) urls = [p for p in parts if p] if not urls: return None, "No URL provided" downloads_dir = os.path.join(os.path.expanduser("~"), "Downloads") output_template = os.path.join(downloads_dir, "%(id)s.%(ext)s") errors = [] cookies_path = _normalize_file_path(cookies_path) try: if shutil.which("yt-dlp"): for url in urls: if not url: continue cmd = [ "yt-dlp", "--write-subs", "--write-auto-subs", "--sub-lang", "en-US", "--skip-download", "--convert-subs", "srt", "-o", output_template, # pass cookies if provided url ] if cookies_path: cmd.extend(["--cookies", cookies_path]) try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(result.stdout) print(result.stderr) except Exception as e: errors.append(f"{url}: {e}") else: ydl_opts = { 'writesubtitles': True, 'writeautomaticsub': True, 'subtitleslangs': ['en-US', 'en'], 'skip_download': True, 'outtmpl': output_template, 'quiet': True, 'subtitlesformat': 'srt' } if cookies_path: ydl_opts['cookies'] = cookies_path try: with YoutubeDL(ydl_opts) as ydl: ydl.download(urls) except Exception as e: errors.append(str(e)) except Exception as e: errors.append(str(e)) srt_files = glob.glob(os.path.join(downloads_dir, "*.srt")) vtt_files = glob.glob(os.path.join(downloads_dir, "*.vtt")) all_files = srt_files + vtt_files if not all_files: if any("HTTP Error 429" in e or "429" in e for e in errors): return None, "Error: HTTP 429 Too Many Requests from YouTube. Try again later." err_msg = "; ".join(errors) if errors else "No subtitle files found in Downloads." return None, f"SRT download error: {err_msg}" temp_dir = tempfile.mkdtemp(prefix="ssui_srt_") copied_paths = [] copy_errors = [] for fpath in all_files: try: dest = os.path.join(temp_dir, os.path.basename(fpath)) shutil.copy2(fpath, dest) copied_paths.append(dest) except Exception as e: copy_errors.append(f"{fpath}: {e}") if not copied_paths: msg = "; ".join(copy_errors) if copy_errors else "Failed to copy subtitle files." return None, f"SRT copy error: {msg}" if len(copied_paths) == 1: return copied_paths[0], f"Downloaded subtitle copied to {copied_paths[0]}" zip_base = os.path.join(temp_dir, "srt_files") zip_path = shutil.make_archive(zip_base, "zip", temp_dir) return zip_path, f"Multiple subtitle files archived to {zip_path}" except Exception as e: print("SRT download error:", e) return None, "Saved in Downloads" def _normalize_file_path(file_input): """Normalize a Gradio file return value (or path) to a string path for yt-dlp cookies. Supports strings, file-like objects, and Gradio dict-style file objects. """ if not file_input: return None # Direct string path if isinstance(file_input, str): return file_input # Gradio returns a dict sometimes with a 'name' or 'tmp_path' field if isinstance(file_input, dict): for k in ("name", "tmp_path", "tempfile", "file_path", "path"): if k in file_input and file_input[k]: return file_input[k] return None # File-like objects often have a .name attribute try: return getattr(file_input, "name", None) except Exception: return None def check_youtube_tag(video_url, tag_to_check, cookies_path=None): try: cookies_path = _normalize_file_path(cookies_path) ydl_opts = {"quiet": True} if cookies_path: ydl_opts["cookies"] = cookies_path # Use a browser-like User-Agent by default to reduce SABR/format issues ydl_opts.setdefault("http_headers", {}) ydl_opts["http_headers"].setdefault("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) tags = info.get('tags', []) tag_to_check_norm = tag_to_check.lower() tags_norm = [t.lower() for t in tags] # Exact match, case-insensitive, apostrophe style must match exists = any(tag_to_check_norm == t for t in tags_norm) if exists: return f"Tag/s '{tag_to_check}' EXISTS in video" else: return f"Tag/s '{tag_to_check}' DOES NOT EXIST in video.\n\nTags found: {tags if tags else 'None'}" except Exception as e: err = str(e) if 'Sign in to confirm your age' in err or ('Sign in' in err and 'age' in err): return f"Error checking {video_url}: This video is age-restricted and requires authentication (provide a cookies.txt file)." if 'HTTP Error 403' in err or '403' in err: return f"Error checking {video_url}: HTTP 403 Forbidden - try supplying a cookies file or updating yt-dlp with `yt-dlp -U`." return f"Error checking {video_url}: {err}" def check_playlist_tags(playlist_url, tag_to_check, cookies_path=None): import tempfile, csv try: cookies_path = _normalize_file_path(cookies_path) ydl_opts = { 'extract_flat': True, 'quiet': True, 'dump_single_json': True } if cookies_path: ydl_opts['cookies'] = cookies_path # Use browser user agent ydl_opts.setdefault("http_headers", {}) ydl_opts["http_headers"].setdefault("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") with YoutubeDL(ydl_opts) as ydl: result = ydl.extract_info(playlist_url, download=False) entries = result.get('entries', []) rows = [] tag_to_check_norm = tag_to_check.lower() for video in entries: video_id = video.get('id') if not video_id: title = video.get('title', 'N/A') rows.append([title, '', 'No video ID in playlist entry']) continue video_url = f'https://www.youtube.com/watch?v={video_id}' title = video.get('title', 'N/A') video_opts = {'quiet': True} if cookies_path: video_opts['cookies'] = cookies_path # Add a user agent video_opts.setdefault("http_headers", {}) video_opts["http_headers"].setdefault("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") try: with YoutubeDL(video_opts) as ydl_video: info = ydl_video.extract_info(video_url, download=False) # Detect unlisted flag if available is_unlisted = info.get('is_unlisted') if isinstance(info, dict) else False # Detect private, membership or age-limit fields if present is_private = info.get('is_private') if isinstance(info, dict) and 'is_private' in info else False age_limit = info.get('age_limit') if isinstance(info, dict) and 'age_limit' in info else 0 # Tags processing tags = info.get('tags', []) or [] tags_norm = [t.lower() for t in tags] exists = any(tag_to_check_norm == t for t in tags_norm) # Build note components parts = [] if is_unlisted: parts.append('Unlisted') if is_private: parts.append('Private') elif age_limit and int(age_limit) >= 18: parts.append('Age-restricted') if exists: parts.append(f"Tag/s '{tag_to_check}' exists in video") else: parts.append('Tag/s does not exist in video') note = '; '.join(parts) rows.append([title, video_url, note]) except Exception as e: err = str(e) err_lower = err.lower() if 'sign in to confirm your age' in err_lower or ('age' in err_lower and 'sign in' in err_lower): note = 'Age-restricted - cookies required or signed-in account needed' elif 'private' in err_lower and 'video' in err_lower: note = 'Private video - access denied' elif 'video unavailable' in err_lower or 'not available' in err_lower or 'removed' in err_lower: note = 'Video unavailable or removed' elif '403' in err_lower or 'forbidden' in err_lower: note = 'HTTP Error 403 Forbidden - cookies may be required or access denied' else: note = f"Could not check video: {err}" rows.append([title, video_url, note]) # Write to temp CSV fd, csv_path = tempfile.mkstemp(suffix=".csv", text=True) os.close(fd) with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["Title", "URL", "Notes"]) writer.writerows(rows) return csv_path except Exception as e: # Write error to CSV fd, csv_path = tempfile.mkstemp(suffix=".csv", text=True) os.close(fd) with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(["Title", "URL", "Notes"]) writer.writerow(["Error", "", str(e)]) return csv_path WHISPER_LANGUAGES = [ ("Afrikaans", "af"), ("Albanian", "sq"), ("Amharic", "am"), ("Arabic", "ar"), ("Armenian", "hy"), ("Assamese", "as"), ("Azerbaijani", "az"), ("Bashkir", "ba"), ("Basque", "eu"), ("Belarusian", "be"), ("Bengali", "bn"), ("Bosnian", "bs"), ("Breton", "br"), ("Bulgarian", "bg"), ("Burmese", "my"), ("Catalan", "ca"), ("Chinese", "zh"), ("Croatian", "hr"), ("Czech", "cs"), ("Danish", "da"), ("Dutch", "nl"), ("English", "en"), ("Estonian", "et"), ("Faroese", "fo"), ("Finnish", "fi"), ("French", "fr"), ("Galician", "gl"), ("Georgian", "ka"), ("German", "de"), ("Greek", "el"), ("Gujarati", "gu"), ("Haitian Creole", "ht"), ("Hausa", "ha"), ("Hebrew", "he"), ("Hindi", "hi"), ("Hungarian", "hu"), ("Icelandic", "is"), ("Indonesian", "id"), ("Italian", "it"), ("Japanese", "ja"), ("Javanese", "jv"), ("Kannada", "kn"), ("Kazakh", "kk"), ("Khmer", "km"), ("Korean", "ko"), ("Lao", "lo"), ("Latin", "la"), ("Latvian", "lv"), ("Lingala", "ln"), ("Lithuanian", "lt"), ("Luxembourgish", "lb"), ("Macedonian", "mk"), ("Malagasy", "mg"), ("Malay", "ms"), ("Malayalam", "ml"), ("Maltese", "mt"), ("Maori", "mi"), ("Marathi", "mr"), ("Mongolian", "mn"), ("Nepali", "ne"), ("Norwegian", "no"), ("Nyanja", "ny"), ("Occitan", "oc"), ("Pashto", "ps"), ("Persian", "fa"), ("Polish", "pl"), ("Portuguese", "pt"), ("Punjabi", "pa"), ("Romanian", "ro"), ("Russian", "ru"), ("Sanskrit", "sa"), ("Serbian", "sr"), ("Shona", "sn"), ("Sindhi", "sd"), ("Sinhala", "si"), ("Slovak", "sk"), ("Slovenian", "sl"), ("Somali", "so"), ("Spanish", "es"), ("Sundanese", "su"), ("Swahili", "sw"), ("Swedish", "sv"), ("Tagalog", "tl"), ("Tajik", "tg"), ("Tamil", "ta"), ("Tatar", "tt"), ("Telugu", "te"), ("Thai", "th"), ("Turkish", "tr"), ("Turkmen", "tk"), ("Ukrainian", "uk"), ("Urdu", "ur"), ("Uzbek", "uz"), ("Vietnamese", "vi"), ("Welsh", "cy"), ("Yiddish", "yi"), ("Yoruba", "yo"), ] with gr.Blocks() as interface: gr.HTML( """
Hosted on 🤗 Hugging Face Spaces
""" ) gr.Markdown( """ This is a Gradio UI app that combines AI-powered speech and language processing technologies. This app supports the following features: - Speech-to-text (WhisperAI) - Language translation (GPT-4) (In progress) - Improved transcription (GPT-4) (In progress) - Text to Speech (In progress) UPDATE: The app now includes Youtube metadata extraction features: (title / URL / ID, subtitles, tag checking) NOTE: This app is currently in the process of applying other AI-solutions for other use cases. """ ) with gr.Tabs(): with gr.TabItem("Speech to Text"): gr.HTML("