Spaces:
Running
Running
| import logging | |
| import os | |
| import re | |
| import shelve | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| # Import functions from your scripts (assuming they are structured appropriately) | |
| # It's often better to refactor scripts into functions for easier import | |
| try: | |
| from huggingface_hub import InferenceClient | |
| from src.create_presentation import ( | |
| DEFAULT_LLM_MODEL, | |
| DEFAULT_PRESENTATION_PROMPT_TEMPLATE, | |
| generate_presentation_with_llm, | |
| ) | |
| from src.create_video import ( | |
| cleanup_temp_files, | |
| concatenate_clips, | |
| convert_pdf_to_images, | |
| create_video_clips, | |
| find_audio_files, | |
| ) | |
| from src.transcription_to_audio import VOICE_ID, text_to_speech | |
| except ImportError as e: | |
| print(f"Error importing script functions: {e}") | |
| print("Please ensure scripts are in the 'src' directory and structured correctly.") | |
| exit(1) | |
| load_dotenv() | |
| # --- Configuration & Setup --- | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| HF_API_KEY = os.getenv("HF_API_KEY") | |
| LLM_MODEL = os.getenv("LLM_MODEL", DEFAULT_LLM_MODEL) | |
| PRESENTATION_PROMPT = os.getenv( | |
| "PRESENTATION_PROMPT", DEFAULT_PRESENTATION_PROMPT_TEMPLATE | |
| ) | |
| CACHE_DIR = ".cache" # For TTS caching | |
| URL_CACHE_DIR = ".url_cache" | |
| URL_CACHE_FILE = os.path.join(URL_CACHE_DIR, "presentations_cache") | |
| TEMPLATE_DIR = os.getenv("TEMPLATE_DIR", "app/template") | |
| # Initialize clients (do this once if possible, or manage carefully in functions) | |
| try: | |
| if HF_API_KEY: | |
| hf_client = InferenceClient(token=HF_API_KEY, provider="cohere") | |
| else: | |
| logger.warning("HF_API_KEY not found. LLM generation will fail.") | |
| hf_client = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Hugging Face client: {e}") | |
| hf_client = None | |
| # --- Helper Functions --- | |
| def fetch_webpage_content(url): | |
| """Fetches and extracts basic text content from a webpage.""" | |
| logger.info(f"Fetching content from: {url}") | |
| try: | |
| response = requests.get(url, timeout=15) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Basic text extraction (can be improved significantly) | |
| paragraphs = soup.find_all("p") | |
| headings = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]) | |
| list_items = soup.find_all("li") | |
| content = ( | |
| "\n".join([h.get_text() for h in headings]) | |
| + "\n\n" | |
| + "\n".join([p.get_text() for p in paragraphs]) | |
| + "\n\n" | |
| + "\n".join(["- " + li.get_text() for li in list_items]) | |
| ) | |
| # Simple cleanup | |
| content = re.sub(r"\s\s+", " ", content).strip() | |
| logger.info( | |
| f"Successfully fetched and parsed content (length: {len(content)})." | |
| ) | |
| return content | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Error fetching URL {url}: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error parsing URL {url}: {e}") | |
| return None | |
| def parse_presentation_markdown(markdown_content): | |
| """Splits presentation markdown into slides with content and notes.""" | |
| slides = [] | |
| slide_parts = re.split(r"\n\n---\n\n", markdown_content) | |
| for i, part in enumerate(slide_parts): | |
| if "???" in part: | |
| content, notes = part.split("???", 1) | |
| slides.append({"id": i, "content": content.strip(), "notes": notes.strip()}) | |
| else: | |
| # Handle slides without notes (like title slide maybe) | |
| slides.append( | |
| { | |
| "id": i, | |
| "content": part.strip(), | |
| "notes": "", # Add empty notes field | |
| } | |
| ) | |
| logger.info(f"Parsed {len(slides)} slides from markdown.") | |
| return slides | |
| def reconstruct_presentation_markdown(slides_data): | |
| """Reconstructs the markdown string from slide data.""" | |
| full_md = [] | |
| for slide in slides_data: | |
| slide_md = slide["content"] | |
| if slide[ | |
| "notes" | |
| ]: # Only add notes separator if notes exist and are not just whitespace | |
| slide_md += f"\n\n???\n{slide['notes'].strip()}" | |
| full_md.append(slide_md.strip()) # Ensure each slide part is stripped | |
| return "\n\n---\n\n".join(full_md) | |
| def generate_pdf_from_markdown(markdown_file_path, output_pdf_path): | |
| """Generates a PDF from a Markdown file using bs export + decktape.""" | |
| logger.info(f"Attempting PDF gen: {markdown_file_path} -> {output_pdf_path}") | |
| working_dir = os.path.dirname(markdown_file_path) | |
| markdown_filename = os.path.basename(markdown_file_path) | |
| html_output_dir_name = "bs_html_output" | |
| html_output_dir_abs = os.path.join(working_dir, html_output_dir_name) | |
| expected_html_filename = os.path.splitext(markdown_filename)[0] + ".html" | |
| generated_html_path_abs = os.path.join(html_output_dir_abs, expected_html_filename) | |
| pdf_gen_success = False | |
| # ---- Step 1: Generate HTML using bs export ---- | |
| try: | |
| Path(html_output_dir_abs).mkdir(parents=True, exist_ok=True) | |
| export_command = ["bs", "export", markdown_filename, "-o", html_output_dir_name] | |
| logger.info(f"Running: {' '.join(export_command)} in CWD: {working_dir}") | |
| export_result = subprocess.run( | |
| export_command, | |
| cwd=working_dir, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=60, | |
| ) | |
| logger.info("Backslide (bs export) OK.") | |
| logger.debug(f"bs export stdout:\n{export_result.stdout}") | |
| logger.debug(f"bs export stderr:\n{export_result.stderr}") | |
| if not os.path.exists(generated_html_path_abs): | |
| logger.error(f"Expected HTML not found: {generated_html_path_abs}") | |
| try: | |
| files_in_dir = os.listdir(html_output_dir_abs) | |
| logger.error(f"Files in {html_output_dir_abs}: {files_in_dir}") | |
| except FileNotFoundError: | |
| logger.error( | |
| f"HTML output directory {html_output_dir_abs} not found after bs run." | |
| ) | |
| raise FileNotFoundError( | |
| f"Generated HTML not found: {generated_html_path_abs}" | |
| ) | |
| except FileNotFoundError: | |
| logger.error( | |
| "`bs` command not found. Install backslide (`npm install -g backslide`)." | |
| ) | |
| raise gr.Error("HTML generation tool (backslide/bs) not found.") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Backslide (bs export) failed (code {e.returncode}).") | |
| logger.error(f"bs stderr:\n{e.stderr}") | |
| raise gr.Error(f"Backslide HTML failed: {e.stderr[:500]}...") | |
| except subprocess.TimeoutExpired: | |
| logger.error("Backslide (bs export) timed out.") | |
| raise gr.Error("HTML generation timed out (backslide).") | |
| except Exception as e: | |
| logger.error(f"Unexpected error during bs export: {e}", exc_info=True) | |
| raise gr.Error(f"Unexpected error during HTML generation: {e}") | |
| # ---- Step 2: Generate PDF from HTML using decktape ---- | |
| try: | |
| Path(output_pdf_path).parent.mkdir(parents=True, exist_ok=True) | |
| html_file_url = Path(generated_html_path_abs).as_uri() | |
| decktape_command = [ | |
| "decktape", | |
| "-p", | |
| "1000", | |
| "--chrome-arg=--allow-running-insecure-content", | |
| "--chrome-arg=--disable-web-security", | |
| "--chrome-arg=--no-sandbox", | |
| html_file_url, | |
| str(output_pdf_path), | |
| ] | |
| logger.info(f"Running PDF conversion: {' '.join(decktape_command)}") | |
| decktape_result = subprocess.run( | |
| decktape_command, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=120, | |
| ) | |
| logger.info("Decktape command executed successfully.") | |
| logger.debug(f"decktape stdout:\n{decktape_result.stdout}") | |
| logger.debug(f"decktape stderr:\n{decktape_result.stderr}") | |
| if os.path.exists(output_pdf_path): | |
| logger.info(f"PDF generated successfully: {output_pdf_path}") | |
| pdf_gen_success = True | |
| return output_pdf_path | |
| else: | |
| logger.error("Decktape command finished but output PDF not found.") | |
| raise gr.Error("Decktape finished, but the PDF file was not created.") | |
| except FileNotFoundError: | |
| logger.error( | |
| "`decktape` command not found. Install decktape (`npm install -g decktape`)." | |
| ) | |
| raise gr.Error("PDF generation tool (decktape) not found.") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Decktape command failed (code {e.returncode}).") | |
| logger.error(f"decktape stderr:\n{e.stderr}") | |
| raise gr.Error(f"Decktape PDF failed: {e.stderr[:500]}...") | |
| except subprocess.TimeoutExpired: | |
| logger.error("Decktape command timed out.") | |
| raise gr.Error("PDF generation timed out (decktape).") | |
| except Exception as e: | |
| logger.error( | |
| f"Unexpected error during decktape PDF generation: {e}", exc_info=True | |
| ) | |
| raise gr.Error(f"Unexpected error during PDF generation: {e}") | |
| finally: | |
| # --- Cleanup HTML output directory --- | |
| if os.path.exists(html_output_dir_abs): | |
| try: | |
| shutil.rmtree(html_output_dir_abs) | |
| logger.info(f"Cleaned up HTML temp dir: {html_output_dir_abs}") | |
| except Exception as cleanup_e: | |
| logger.warning( | |
| f"Could not cleanup HTML dir {html_output_dir_abs}: {cleanup_e}" | |
| ) | |
| # Log final status | |
| if pdf_gen_success: | |
| logger.info(f"PDF generation process completed for {output_pdf_path}.") | |
| else: | |
| logger.error(f"PDF generation process failed for {output_pdf_path}.") | |
| # --- Helper Function to Read CSS --- | |
| def load_css(css_filename="style.scss"): | |
| """Loads CSS content from the template directory.""" | |
| css_path = os.path.join(TEMPLATE_DIR, css_filename) | |
| try: | |
| with open(css_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| logger.warning(f"CSS file not found at {css_path}. No custom styles applied.") | |
| return "" # Return empty string instead of None | |
| except Exception as e: | |
| logger.error(f"Error reading CSS file {css_path}: {e}") | |
| return "" # Return empty string on error | |
| # --- Gradio Workflow Functions --- | |
| def step1_fetch_and_generate_presentation(url, progress=gr.Progress(track_tqdm=True)): | |
| """Fetches content, generates presentation markdown, prepares editor, and copies template. Uses caching based on URL.""" | |
| if not url: | |
| raise gr.Error("Please enter a URL.") | |
| logger.info(f"Step 1: Fetching & Generating for {url}") | |
| status_update = f"Starting Step 1: Fetching content from {url}..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # --- Cache Check --- | |
| try: | |
| os.makedirs(URL_CACHE_DIR, exist_ok=True) # Ensure cache dir exists | |
| with shelve.open(URL_CACHE_FILE) as cache: | |
| if url in cache: | |
| logger.info(f"Cache hit for URL: {url}") | |
| progress(0.5, desc="Loading cached presentation...") | |
| cached_data = cache[url] | |
| presentation_md = cached_data.get("presentation_md") | |
| slides_data = cached_data.get("slides_data") | |
| if not presentation_md: | |
| logger.warning( | |
| f"Cache for {url} missing 'presentation_md'. Regenerating." | |
| ) | |
| if not slides_data: | |
| logger.warning( | |
| f"Cache for {url} missing 'slides_data'. Regenerating." | |
| ) | |
| if presentation_md and slides_data: | |
| logger.info( | |
| f"Found complete cache entry for {url} with {len(slides_data)} slides." | |
| ) | |
| temp_dir = tempfile.mkdtemp() | |
| md_path = os.path.join(temp_dir, "presentation.md") | |
| try: | |
| with open(md_path, "w", encoding="utf-8") as f: | |
| f.write(presentation_md) | |
| logger.info( | |
| f"Wrote cached presentation to temp file: {md_path}" | |
| ) | |
| # --- Copy Template Directory for Cached Item --- | |
| template_src_dir = TEMPLATE_DIR | |
| template_dest_dir = os.path.join( | |
| temp_dir, os.path.basename(TEMPLATE_DIR) | |
| ) | |
| if os.path.isdir(template_src_dir): | |
| try: | |
| shutil.copytree(template_src_dir, template_dest_dir) | |
| logger.info( | |
| f"Copied template dir to {template_dest_dir} (cached)" | |
| ) | |
| except Exception as copy_e: | |
| logger.error( | |
| f"Failed to copy template dir for cache: {copy_e}" | |
| ) | |
| shutil.rmtree(temp_dir) | |
| raise gr.Error(f"Failed to prepare template: {copy_e}") | |
| else: | |
| logger.error( | |
| f"Template source dir '{template_src_dir}' not found." | |
| ) | |
| shutil.rmtree(temp_dir) | |
| raise gr.Error( | |
| f"Required template '{template_src_dir}' not found." | |
| ) | |
| progress(0.9, desc="Preparing editor from cache...") | |
| status_update = ( | |
| "Loaded presentation from cache. Preparing editor..." | |
| ) | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| logger.info(f"Using cached data for {len(slides_data)} slides.") | |
| # Final yield must match outputs list | |
| yield ( | |
| gr.update(value=status_update), | |
| temp_dir, | |
| md_path, | |
| slides_data, | |
| gr.update(visible=True), # editor_column | |
| gr.update( | |
| visible=True | |
| ), # btn_build_slides (Enable PDF button next) | |
| gr.update( | |
| interactive=False | |
| ), # btn_fetch_generate (disable) | |
| ) | |
| return # End generator here for cache hit | |
| except Exception as e: | |
| logger.error(f"Error writing cached markdown: {e}") | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| # If writing cache fails, raise to trigger full regeneration flow | |
| raise gr.Error(f"Failed to write cached markdown: {e}") | |
| else: | |
| # This case is now covered by the more specific logging above | |
| pass # Continue to regeneration | |
| # --- Cache Miss or Failed Cache Load --- | |
| logger.info(f"Cache miss for URL: {url}. Proceeding with generation.") | |
| progress(0.1, desc="Fetching webpage content...") | |
| if not hf_client: | |
| raise gr.Error("LLM Client not initialized. Check API Key.") | |
| status_update = "Fetching webpage content..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| web_content = fetch_webpage_content(url) | |
| if not web_content: | |
| raise gr.Error("Failed to fetch or parse content from the URL.") | |
| progress(0.3, desc="Generating presentation with LLM...") | |
| status_update = "Generating presentation with LLM..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| try: | |
| presentation_md = generate_presentation_with_llm( | |
| hf_client, LLM_MODEL, PRESENTATION_PROMPT, web_content, url | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error during LLM call: {e}", exc_info=True) | |
| raise gr.Error(f"Failed to generate presentation from LLM: {e}") | |
| if not presentation_md: | |
| logger.error("LLM generation returned None.") | |
| raise gr.Error("LLM generation failed (received None).") | |
| # Check for basic structure early, but parsing handles final validation | |
| if "---" not in presentation_md: | |
| logger.warning( | |
| "LLM output missing slide separators ('---'). Parsing might fail." | |
| ) | |
| if "???" not in presentation_md: | |
| logger.warning( | |
| "LLM output missing notes separators ('???'). Notes might be empty." | |
| ) | |
| progress(0.7, desc="Parsing presentation slides...") | |
| status_update = "Parsing presentation slides..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| slides_data = parse_presentation_markdown(presentation_md) | |
| if not slides_data: | |
| logger.error("Parsing markdown resulted in zero slides.") | |
| raise gr.Error("Failed to parse generated presentation markdown.") | |
| # Create a temporary directory for this session | |
| temp_dir = tempfile.mkdtemp() | |
| md_path = os.path.join(temp_dir, "presentation.md") | |
| with open(md_path, "w", encoding="utf-8") as f: | |
| f.write(presentation_md) | |
| logger.info(f"Presentation markdown saved to temp file: {md_path}") | |
| # --- Copy Template Directory for New Item --- | |
| template_src_dir = TEMPLATE_DIR | |
| template_dest_dir = os.path.join(temp_dir, os.path.basename(TEMPLATE_DIR)) | |
| if os.path.isdir(template_src_dir): | |
| try: | |
| shutil.copytree(template_src_dir, template_dest_dir) | |
| logger.info(f"Copied template directory to {template_dest_dir}") | |
| except Exception as copy_e: | |
| logger.error(f"Failed to copy template directory: {copy_e}") | |
| shutil.rmtree(temp_dir) | |
| raise gr.Error(f"Failed to prepare template: {copy_e}") | |
| else: | |
| logger.error(f"Template source dir '{template_src_dir}' not found.") | |
| shutil.rmtree(temp_dir) | |
| raise gr.Error(f"Required template '{template_src_dir}' not found.") | |
| # --- Store in Cache --- | |
| try: | |
| with shelve.open(URL_CACHE_FILE) as cache_write: | |
| cache_write[url] = { | |
| "presentation_md": presentation_md, | |
| "slides_data": slides_data, | |
| } | |
| logger.info( | |
| f"Stored generated presentation in cache for URL: {url}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to write to cache for URL {url}: {e}") | |
| progress(0.9, desc="Preparing editor...") | |
| status_update = "Generated presentation. Preparing editor..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| logger.info(f"Prepared data for {len(slides_data)} slides.") | |
| # Final yield must match outputs list | |
| yield ( | |
| gr.update(value=status_update), | |
| temp_dir, | |
| md_path, | |
| slides_data, | |
| gr.update(visible=True), # editor_column | |
| gr.update(visible=True), # btn_build_slides (Enable PDF button next) | |
| gr.update(interactive=False), # btn_fetch_generate (disable) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in step 1 (fetch/generate): {e}", exc_info=True) | |
| status_update = f"Error during presentation setup: {e}" | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # Yield a final tuple matching outputs, indicating error state if possible | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| None, | |
| [], | |
| gr.update(visible=False), # Keep editor hidden | |
| gr.update(visible=False), # Keep build button hidden | |
| gr.update(interactive=True), # Re-enable fetch button | |
| ) | |
| # Do not re-raise if we want the Gradio app to stay alive | |
| # raise gr.Error(f"Error during presentation setup: {e}") | |
| def step2_build_slides( | |
| state_temp_dir, | |
| state_md_path, | |
| state_slides_data, | |
| *editors, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Renamed from step2_generate_pdf""" | |
| if not all([state_temp_dir, state_md_path, state_slides_data]): | |
| raise gr.Error("Session state missing.") | |
| logger.info("Step 2: Building Slides (PDF + Images)") | |
| status_update = "Starting Step 2: Building slides..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| num_slides = len(state_slides_data) | |
| MAX_SLIDES = 20 | |
| all_editors = list(editors) | |
| if len(all_editors) != MAX_SLIDES * 2: | |
| raise gr.Error(f"Incorrect editor inputs: {len(all_editors)}") | |
| edited_contents = all_editors[:MAX_SLIDES][:num_slides] | |
| edited_notes_list = all_editors[MAX_SLIDES:][:num_slides] | |
| if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: | |
| raise gr.Error("Editor input mismatch.") | |
| progress(0.1, desc="Saving edited markdown...") | |
| status_update = "Saving edited markdown..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| updated_slides = [] | |
| for i in range(num_slides): | |
| updated_slides.append( | |
| {"id": i, "content": edited_contents[i], "notes": edited_notes_list[i]} | |
| ) | |
| updated_md = reconstruct_presentation_markdown(updated_slides) | |
| try: | |
| with open(state_md_path, "w", encoding="utf-8") as f: | |
| f.write(updated_md) | |
| logger.info(f"Saved edited markdown: {state_md_path}") | |
| except IOError as e: | |
| status_update = f"Failed to save markdown: {e}" | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # Final yield must match outputs | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(visible=True), | |
| gr.update(), | |
| ) | |
| raise gr.Error(f"Failed to save markdown: {e}") | |
| progress(0.3, desc="Generating PDF...") | |
| status_update = "Generating PDF..." | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| pdf_output_path = os.path.join(state_temp_dir, "presentation.pdf") | |
| try: | |
| generated_pdf_path = generate_pdf_from_markdown(state_md_path, pdf_output_path) | |
| if not generated_pdf_path: | |
| raise gr.Error("PDF generation failed (check logs).") | |
| except gr.Error as e: | |
| status_update = f"PDF Generation Error: {e}" | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # Final yield must match outputs | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(visible=True), | |
| gr.update(), | |
| ) | |
| raise e | |
| except Exception as e: | |
| status_update = f"Unexpected PDF Error: {e}" | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # Final yield must match outputs | |
| yield ( | |
| gr.update(value=status_update), | |
| None, | |
| [], | |
| gr.update(), | |
| gr.update(visible=True), | |
| gr.update(), | |
| ) | |
| raise gr.Error(f"Unexpected error generating PDF: {e}") | |
| progress(0.7, desc="Converting PDF to images...") | |
| status_update = "Converting PDF to images..." | |
| yield ( | |
| gr.update(value=status_update), | |
| generated_pdf_path, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| pdf_images = [] | |
| try: | |
| pdf_images = convert_pdf_to_images( | |
| generated_pdf_path, dpi=150 | |
| ) # Use generated path | |
| if not pdf_images: | |
| raise gr.Error("PDF to image conversion failed.") | |
| logger.info(f"Converted PDF to {len(pdf_images)} images.") | |
| if len(pdf_images) != num_slides: | |
| warning_msg = f"Warning: PDF page count ({len(pdf_images)}) != slide count ({num_slides}). Images might mismatch." | |
| gr.Warning(warning_msg) | |
| status_update += f" ({warning_msg})" | |
| # Pad or truncate? For now, just return what we have, UI update logic handles MAX_SLIDES | |
| except Exception as e: | |
| logger.error(f"Error converting PDF to images: {e}", exc_info=True) | |
| status_update = f"Failed to convert PDF to images: {e}" | |
| yield ( | |
| gr.update(value=status_update), | |
| generated_pdf_path, | |
| [], | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| # Final yield must match outputs | |
| yield ( | |
| gr.update(value=status_update), | |
| generated_pdf_path, | |
| [], | |
| gr.update(), | |
| gr.update(visible=True), | |
| gr.update(value=generated_pdf_path, visible=True), | |
| ) | |
| # Proceed without images? Or raise error? Let's raise. | |
| raise gr.Error(f"Failed to convert PDF to images: {e}") | |
| info_msg = f"Built {len(pdf_images)} slide images. Ready for Step 3." | |
| logger.info(info_msg) | |
| progress(1.0, desc="Slide build complete.") | |
| status_update = f"Step 2 Complete: {info_msg}" | |
| yield ( | |
| gr.update(value=status_update), | |
| generated_pdf_path, | |
| pdf_images, # Return the list of image paths | |
| gr.update(visible=True), # btn_generate_audio | |
| gr.update(visible=False), # btn_build_slides | |
| gr.update(value=generated_pdf_path, visible=True), # pdf_download_link | |
| ) | |
| def step3_generate_audio(*args, progress=gr.Progress(track_tqdm=True)): | |
| """Generates audio files for the speaker notes using edited content.""" | |
| # Args structure adjustment: | |
| # args[0]: state_temp_dir | |
| # args[1]: state_md_path | |
| # args[2]: original_slides_data | |
| # args[3...]: editors | |
| state_temp_dir = args[0] | |
| state_md_path = args[1] | |
| original_slides_data = args[2] | |
| # editors = args[3:] # Old slicing | |
| num_slides = len(original_slides_data) | |
| if num_slides == 0: | |
| logger.error("Step 3 (Audio) called with zero slides data.") | |
| raise gr.Error("No slide data available. Please start over.") | |
| MAX_SLIDES = 20 # Ensure this matches UI definition | |
| # --- Adjust indices based on removing status_textbox input --- | |
| # Start editors from index 3 now | |
| code_editors_start_index = 3 | |
| notes_textboxes_start_index = 3 + MAX_SLIDES | |
| # Slice the *actual* edited values based on num_slides | |
| edited_contents = args[ | |
| code_editors_start_index : code_editors_start_index + num_slides | |
| ] | |
| edited_notes_list = args[ | |
| notes_textboxes_start_index : notes_textboxes_start_index + num_slides | |
| ] | |
| if not state_temp_dir or not state_md_path: | |
| raise gr.Error("Session state lost (Audio step). Please start over.") | |
| # Check slicing | |
| if len(edited_contents) != num_slides or len(edited_notes_list) != num_slides: | |
| logger.error( | |
| f"Input slicing error (Audio step): Expected {num_slides}, got {len(edited_contents)} contents, {len(edited_notes_list)} notes." | |
| ) | |
| raise gr.Error( | |
| f"Input processing error: Mismatch after slicing ({num_slides} slides)." | |
| ) | |
| logger.info(f"Processing {num_slides} slides for audio generation.") | |
| status_update = "Starting Step 3: Generating audio..." | |
| yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
| gr.update(), | |
| ) * MAX_SLIDES * 2 | |
| audio_dir = os.path.join(state_temp_dir, "audio") | |
| os.makedirs(audio_dir, exist_ok=True) | |
| # --- Update the presentation.md file AGAIN in case notes changed after PDF --- | |
| # This might be redundant if users don't edit notes between PDF and Audio steps, | |
| # but ensures the audio matches the *latest* notes displayed. | |
| progress(0.1, desc="Saving latest notes...") | |
| status_update = "Saving latest notes..." | |
| yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
| gr.update(), | |
| ) * MAX_SLIDES * 2 | |
| updated_slides_data = [] | |
| for i in range(num_slides): | |
| updated_slides_data.append( | |
| { | |
| "id": original_slides_data[i]["id"], # Keep original ID | |
| "content": edited_contents[i], # Use sliced edited content | |
| "notes": edited_notes_list[i], # Use sliced edited notes | |
| } | |
| ) | |
| updated_markdown = reconstruct_presentation_markdown(updated_slides_data) | |
| try: | |
| with open(state_md_path, "w", encoding="utf-8") as f: | |
| f.write(updated_markdown) | |
| logger.info(f"Updated presentation markdown before audio gen: {state_md_path}") | |
| except IOError as e: | |
| warning_msg = f"Warning: Could not save latest notes to markdown file: {e}" | |
| gr.Warning(warning_msg) | |
| status_update += f" ({warning_msg})" | |
| yield (gr.update(value=status_update), None, gr.update(), gr.update()) + ( | |
| gr.update(), | |
| ) * MAX_SLIDES * 2 | |
| # Note: We continue processing audio even if saving markdown fails | |
| # If we need to stop and return final state, do it here: | |
| # yield (gr.update(value=status_update), state_audio_dir, gr.update(), gr.update(visible=True), *[gr.update()]*N, *[gr.update()]*N) | |
| # raise gr.Error(f"Failed to save updated markdown before audio gen: {e}") # Old raise | |
| generated_audio_paths = ["" for _ in range(num_slides)] | |
| audio_generation_failed = False | |
| successful_audio_count = 0 | |
| for i in range(num_slides): | |
| note_text = edited_notes_list[i] | |
| slide_num = i + 1 | |
| progress_val = (i + 1) / num_slides * 0.8 + 0.1 | |
| progress( | |
| progress_val, | |
| desc=f"Audio slide {slide_num}/{num_slides}", | |
| ) | |
| status_update = f"Generating audio for slide {slide_num}/{num_slides}..." | |
| yield (gr.update(value=status_update), audio_dir, gr.update(), gr.update()) + ( | |
| gr.update(), | |
| ) * MAX_SLIDES * 2 | |
| output_file_path = Path(audio_dir) / f"{slide_num}.wav" | |
| if not note_text or not note_text.strip(): | |
| try: # Generate silence | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-y", | |
| "-f", | |
| "lavfi", | |
| "-i", | |
| "anullsrc=r=44100:cl=mono", | |
| "-t", | |
| "0.1", | |
| "-q:a", | |
| "9", | |
| str(output_file_path), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| generated_audio_paths[i] = str(output_file_path) | |
| except Exception as e: | |
| audio_generation_failed = True | |
| logger.error(f"Silence gen failed slide {i + 1}: {e}") | |
| continue | |
| try: # Generate TTS | |
| success = text_to_speech( | |
| note_text, output_file_path, voice=VOICE_ID, cache_dir=CACHE_DIR | |
| ) | |
| if success: | |
| generated_audio_paths[i] = str(output_file_path) | |
| successful_audio_count += 1 | |
| else: | |
| audio_generation_failed = True | |
| logger.error(f"TTS failed slide {i + 1}") | |
| except Exception as e: | |
| audio_generation_failed = True | |
| logger.error(f"TTS exception slide {i + 1}: {e}", exc_info=True) | |
| # --- Prepare outputs for Gradio --- | |
| audio_player_updates = [ | |
| gr.update(value=p if p else None, visible=bool(p and os.path.exists(p))) | |
| for p in generated_audio_paths | |
| ] | |
| regen_button_updates = [gr.update(visible=True)] * num_slides | |
| audio_player_updates.extend( | |
| [gr.update(value=None, visible=False)] * (MAX_SLIDES - num_slides) | |
| ) | |
| regen_button_updates.extend([gr.update(visible=False)] * (MAX_SLIDES - num_slides)) | |
| info_msg = f"Generated {successful_audio_count}/{num_slides} audio clips. " | |
| if audio_generation_failed: | |
| info_msg += "Some audio failed. Review/Regenerate before video." | |
| gr.Warning(info_msg) | |
| else: | |
| info_msg += "Ready for Step 4." | |
| logger.info(info_msg) | |
| progress(1.0, desc="Audio generation complete.") | |
| status_update = f"Step 3 Complete: {info_msg}" | |
| # Return tuple including status update + original outputs | |
| # Final yield must match outputs list | |
| yield ( | |
| gr.update(value=status_update), | |
| audio_dir, | |
| gr.update(visible=True), # btn_generate_video | |
| gr.update(visible=False), # btn_generate_audio | |
| *audio_player_updates, | |
| *regen_button_updates, | |
| ) | |
| def step4_generate_video( | |
| state_temp_dir, | |
| state_audio_dir, | |
| state_pdf_path, # Use PDF path from state | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Generates the final video using PDF images and audio files.""" | |
| if not state_temp_dir or not state_audio_dir or not state_pdf_path: | |
| raise gr.Error("Session state lost (Video step). Please start over.") | |
| if not os.path.exists(state_pdf_path): | |
| raise gr.Error(f"PDF file not found: {state_pdf_path}. Cannot generate video.") | |
| if not os.path.isdir(state_audio_dir): | |
| raise gr.Error( | |
| f"Audio directory not found: {state_audio_dir}. Cannot generate video." | |
| ) | |
| video_output_path = os.path.join(state_temp_dir, "final_presentation.mp4") | |
| status_update = "Starting Step 4: Generating video..." | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| progress(0.1, desc="Preparing video components...") | |
| pdf_images = [] # Initialize to ensure cleanup happens | |
| try: | |
| # Find audio files (natsorted) | |
| audio_files = find_audio_files(state_audio_dir, "*.wav") | |
| if not audio_files: | |
| warning_msg = f"Warning: No WAV files found in {state_audio_dir}. Video might lack audio." | |
| logger.warning(warning_msg) | |
| status_update += f" ({warning_msg})" | |
| # Decide whether to proceed with silent video or error out | |
| # raise gr.Error(f"No audio files found in {state_audio_dir}") | |
| # Convert PDF to images | |
| progress(0.2, desc="Converting PDF to images...") | |
| status_update = "Converting PDF back to images for video..." | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| pdf_images = convert_pdf_to_images(state_pdf_path, dpi=150) | |
| if not pdf_images: | |
| raise gr.Error(f"Failed to convert PDF ({state_pdf_path}) to images.") | |
| # Allow video generation even if audio is missing or count mismatch | |
| # The create_video_clips function should handle missing audio gracefully (e.g., use image duration) | |
| if len(pdf_images) != len(audio_files): | |
| warning_msg = f"Warning: Mismatch: {len(pdf_images)} PDF pages vs {len(audio_files)} audio files. Video clips might have incorrect durations or missing audio." | |
| logger.warning(warning_msg) | |
| status_update += f" ({warning_msg})" | |
| # yield status_update # Old yield | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| progress(0.5, desc="Creating individual video clips...") | |
| status_update = "Creating individual video clips..." | |
| # yield status_update # Old yield | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| buffer_seconds = 1.0 | |
| output_fps = 10 | |
| video_clips = create_video_clips( | |
| pdf_images, audio_files, buffer_seconds, output_fps | |
| ) | |
| if not video_clips: | |
| raise gr.Error("Failed to create any video clips.") | |
| progress(0.8, desc="Concatenating clips...") | |
| status_update = "Concatenating clips into final video..." | |
| # yield status_update # Old yield | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| concatenate_clips(video_clips, video_output_path, output_fps) | |
| logger.info(f"Video concatenation complete: {video_output_path}") | |
| progress(0.95, desc="Cleaning up temp images...") | |
| status_update = "Cleaning up temporary image files..." | |
| # yield status_update # Old yield | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| cleanup_temp_files(pdf_images) # Pass the list of image paths | |
| except Exception as e: | |
| if pdf_images: | |
| cleanup_temp_files(pdf_images) | |
| logger.error(f"Video generation failed: {e}", exc_info=True) | |
| status_update = f"Video generation failed: {e}" | |
| yield (gr.update(value=status_update), gr.update(), gr.update()) | |
| # Final yield must match outputs | |
| yield (gr.update(value=status_update), gr.update(), gr.update(visible=True)) | |
| raise gr.Error(f"Video generation failed: {e}") | |
| info_msg = f"Video generated: {os.path.basename(video_output_path)}" | |
| logger.info(info_msg) | |
| progress(1.0, desc="Video Complete.") | |
| status_update = f"Step 4 Complete: {info_msg}" | |
| # Return tuple including status update | |
| yield ( | |
| gr.update(value=status_update), | |
| gr.update(value=video_output_path, visible=True), # video_output | |
| gr.update(visible=False), # btn_generate_video | |
| ) | |
| def cleanup_session(temp_dir): | |
| """Removes the temporary directory.""" | |
| if temp_dir and isinstance(temp_dir, str) and os.path.exists(temp_dir): | |
| try: | |
| shutil.rmtree(temp_dir) | |
| logger.info(f"Cleaned up temporary directory: {temp_dir}") | |
| return "Cleaned up session files." | |
| except Exception as e: | |
| logger.error(f"Error cleaning up temp directory {temp_dir}: {e}") | |
| return f"Error during cleanup: {e}" | |
| logger.warning(f"Cleanup called but temp_dir invalid or not found: {temp_dir}") | |
| return "No valid temporary directory found to clean." | |
| # --- Dependency Check and Installation --- (Added) | |
| def check_and_install_npm_dependency(command_name, package_name, install_instructions): | |
| """Checks if a command exists, tries to install via npm if not.""" | |
| if shutil.which(command_name): | |
| logger.info(f"Command '{command_name}' found.") | |
| return True | |
| else: | |
| logger.warning( | |
| f"Command '{command_name}' not found. Attempting to install '{package_name}' globally via npm..." | |
| ) | |
| npm_command = ["npm", "install", "-g", package_name] | |
| try: | |
| result = subprocess.run( | |
| npm_command, check=True, capture_output=True, text=True, timeout=300 | |
| ) | |
| logger.info(f"Successfully installed '{package_name}'.") | |
| logger.debug(f"npm stdout:\n{result.stdout}") | |
| logger.debug(f"npm stderr:\n{result.stderr}") | |
| # Verify again after install attempt | |
| if shutil.which(command_name): | |
| logger.info(f"Command '{command_name}' is now available.") | |
| return True | |
| else: | |
| logger.error( | |
| f"Installation of '{package_name}' reported success, but command '{command_name}' still not found. Check npm logs and PATH." | |
| ) | |
| return False | |
| except FileNotFoundError: | |
| logger.error( | |
| f"`npm` command not found. Cannot install '{package_name}'. {install_instructions}" | |
| ) | |
| return False | |
| except subprocess.CalledProcessError as e: | |
| logger.error( | |
| f"Failed to install '{package_name}' (return code {e.returncode})." | |
| ) | |
| logger.error(f"npm stdout:\n{e.stdout}") | |
| logger.error(f"npm stderr:\n{e.stderr}") | |
| logger.error( | |
| "Installation might require administrator privileges (e.g., run with 'sudo')." | |
| ) | |
| return False | |
| except subprocess.TimeoutExpired: | |
| logger.error(f"Installation of '{package_name}' timed out.") | |
| return False | |
| except Exception as e: | |
| logger.error( | |
| f"An unexpected error occurred during '{package_name}' installation: {e}", | |
| exc_info=True, | |
| ) | |
| return False | |
| # --- Gradio Interface --- | |
| # Load custom CSS | |
| custom_css = load_css() | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(), css=custom_css, title="Webpage to Video" | |
| ) as demo: | |
| gr.Markdown("# Webpage to Video Presentation Generator") | |
| # State variables | |
| state_temp_dir = gr.State(None) | |
| state_md_path = gr.State(None) | |
| state_audio_dir = gr.State(None) | |
| state_pdf_path = gr.State(None) | |
| state_slides_data = gr.State([]) | |
| state_pdf_image_paths = gr.State([]) | |
| MAX_SLIDES = 20 | |
| # --- Tabbed Interface --- | |
| with gr.Tabs(elem_id="tabs") as tabs_widget: | |
| # Tab 1: Generate Presentation | |
| with gr.TabItem("1. Generate Presentation", id=0): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Step 1:** Enter URL") | |
| input_url = gr.Textbox( | |
| label="Webpage URL", | |
| value="https://huggingface.co/blog/llm-course", | |
| ) | |
| btn_fetch_generate = gr.Button( | |
| value="1. Fetch & Generate", variant="primary" | |
| ) | |
| with gr.Column(scale=4): | |
| gr.Markdown( | |
| "### Instructions\n1. Enter URL & click 'Fetch & Generate'.\n2. Editor appears below.\n3. Go to next tab." | |
| ) | |
| # Tab 2: Build Slides | |
| with gr.TabItem("2. Build Slides", id=1): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Step 2:** Review/Edit, then build slides.") | |
| btn_build_slides = gr.Button( | |
| value="2. Build Slides", variant="secondary", visible=False | |
| ) | |
| pdf_download_link = gr.File( | |
| label="Download PDF", visible=False, interactive=False | |
| ) | |
| with gr.Column(scale=4): | |
| gr.Markdown( | |
| "### Instructions\n1. Edit content/notes below.\n2. Click 'Build Slides'. Images appear.\n3. Download PDF from sidebar.\n4. Go to next tab." | |
| ) | |
| # Tab 3: Generate Audio | |
| with gr.TabItem("3. Generate Audio", id=2): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Step 3:** Review/Edit notes, then generate audio.") | |
| btn_generate_audio = gr.Button( | |
| value="3. Generate Audio", variant="primary", visible=False | |
| ) | |
| with gr.Column(scale=4): | |
| gr.Markdown( | |
| "### Instructions\n1. Finalize notes below.\n2. Click 'Generate Audio'.\n3. Regenerate if needed.\n4. Go to next tab." | |
| ) | |
| # Tab 4: Generate Video | |
| with gr.TabItem("4. Create Video", id=3): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Step 4:** Create the final video.") | |
| btn_generate_video = gr.Button( | |
| value="4. Create Video", variant="primary", visible=False | |
| ) | |
| with gr.Column(scale=4): | |
| gr.Markdown( | |
| "### Instructions\n1. Click 'Create Video'.\n2. Video appears below.".replace( | |
| "\n", "\n" | |
| ) # Ensure newlines are rendered | |
| ) | |
| video_output = gr.Video(label="Final Video", visible=False) | |
| # --- Status Textbox (Moved BEFORE editor_column) --- | |
| with gr.Row(): | |
| status_textbox = gr.Textbox( | |
| label="Status", | |
| value="Enter a URL and click 'Fetch & Generate' to start.", | |
| interactive=False, | |
| lines=1, | |
| max_lines=1, | |
| ) | |
| # Define the shared editor structure once, AFTER tabs and status box | |
| slide_editors_group = [] | |
| with gr.Column(visible=False) as editor_column: # Initially hidden | |
| gr.Markdown("--- \n## Edit Slides & Notes") | |
| gr.Markdown("_(PDF uses content & notes, Audio uses notes only)_") | |
| for i in range(MAX_SLIDES): | |
| with gr.Accordion(f"Slide {i + 1}", open=(i == 0), visible=False) as acc: | |
| with gr.Row(): # Row for Content/Preview/Image | |
| with gr.Column(scale=1): | |
| code_editor = gr.Code( | |
| label="Content (Markdown)", | |
| language="markdown", | |
| lines=15, | |
| interactive=True, | |
| visible=False, | |
| ) | |
| notes_textbox = ( | |
| gr.Textbox( # Changed from gr.Code to gr.Textbox | |
| label="Script/Notes (for Audio)", | |
| lines=8, | |
| # language="markdown", # Removed language parameter | |
| interactive=True, | |
| visible=False, | |
| ) | |
| ) | |
| with gr.Column(scale=1): | |
| slide_image = gr.Image( | |
| label="Slide Image", | |
| visible=False, | |
| interactive=False, | |
| # height=300, # Removed fixed height | |
| ) | |
| md_preview = gr.Markdown(visible=False) | |
| with gr.Row(): # Row for audio controls | |
| audio_player = gr.Audio( | |
| label="Generated Audio", | |
| visible=False, | |
| interactive=False, | |
| scale=3, | |
| ) | |
| regen_button = gr.Button( | |
| value="Regen Audio", visible=False, scale=1, size="sm" | |
| ) | |
| slide_editors_group.append( | |
| ( | |
| acc, | |
| code_editor, | |
| md_preview, | |
| notes_textbox, | |
| audio_player, | |
| regen_button, | |
| slide_image, | |
| ) | |
| ) | |
| code_editor.change( | |
| fn=lambda x: x, | |
| inputs=code_editor, | |
| outputs=md_preview, | |
| show_progress="hidden", | |
| ) | |
| # --- Component Lists for Updates --- | |
| all_editor_components = [comp for group in slide_editors_group for comp in group] | |
| all_code_editors = [group[1] for group in slide_editors_group] | |
| all_notes_textboxes = [group[3] for group in slide_editors_group] | |
| all_audio_players = [group[4] for group in slide_editors_group] | |
| all_regen_buttons = [group[5] for group in slide_editors_group] | |
| all_slide_images = [group[6] for group in slide_editors_group] | |
| # --- Main Button Click Handlers --- (Outputs use locally defined component vars) | |
| # Step 1 Click Handler | |
| step1_outputs = [ | |
| status_textbox, # Added status output | |
| state_temp_dir, | |
| state_md_path, | |
| state_slides_data, | |
| editor_column, # Show the editor column | |
| btn_build_slides, # Enable the button in Tab 2 | |
| btn_fetch_generate, # Disable self | |
| ] | |
| btn_fetch_generate.click( | |
| fn=step1_fetch_and_generate_presentation, | |
| inputs=[input_url], | |
| outputs=step1_outputs, | |
| show_progress="full", | |
| ).then( | |
| fn=lambda s_data: ( | |
| gr.update( | |
| value="Editor populated. Proceed to Step 2." | |
| ), # Status update first | |
| # Then unpack the list/tuple of editor updates | |
| *[ | |
| upd | |
| for i, slide in enumerate(s_data) | |
| if i < MAX_SLIDES | |
| for upd in [ | |
| gr.update( | |
| # Get cleaned first line for title, then use in f-string | |
| label=f"Slide {i + 1}: {(slide['content'].splitlines()[0] if slide['content'] else '').strip()[:30]}...", | |
| visible=True, | |
| open=(i == 0), | |
| ), # Accordion | |
| gr.update(value=slide["content"], visible=True), # Code Editor | |
| gr.update(value=slide["content"], visible=True), # MD Preview | |
| gr.update(value=slide["notes"], visible=True), # Notes Textbox | |
| gr.update(value=None, visible=False), # Audio Player | |
| gr.update(visible=False), # Regen Button | |
| gr.update(value=None, visible=False), # Slide Image | |
| ] | |
| ] | |
| + [ | |
| upd | |
| for i in range(len(s_data), MAX_SLIDES) | |
| for upd in [gr.update(visible=False)] * 7 | |
| ], | |
| ), # Correctly construct the output tuple | |
| inputs=[state_slides_data], | |
| outputs=[status_textbox] + all_editor_components, # Add status_textbox output | |
| show_progress="hidden", | |
| ).then(lambda: gr.update(selected=1), outputs=tabs_widget) # Switch to Tab 2 | |
| # Step 2 Click Handler | |
| step2_inputs = ( | |
| [state_temp_dir, state_md_path, state_slides_data] | |
| + all_code_editors | |
| + all_notes_textboxes | |
| ) | |
| step2_outputs = [ | |
| status_textbox, # Added status output | |
| state_pdf_path, | |
| state_pdf_image_paths, | |
| btn_generate_audio, # Enable button in Tab 3 | |
| btn_build_slides, # Disable self | |
| pdf_download_link, # Update download link in Tab 2 | |
| ] | |
| btn_build_slides.click( | |
| fn=step2_build_slides, | |
| inputs=step2_inputs, | |
| outputs=step2_outputs, | |
| show_progress="full", | |
| ).then( | |
| fn=lambda image_paths: ( | |
| gr.update( | |
| value="Slide images updated. Proceed to Step 3." | |
| ), # Status update first | |
| # Then unpack the list/tuple of image updates | |
| *[ | |
| gr.update( | |
| value=image_paths[i] if i < len(image_paths) else None, | |
| visible=(i < len(image_paths)), | |
| ) | |
| for i in range(MAX_SLIDES) | |
| ], | |
| ), # Correctly construct the output tuple | |
| inputs=[state_pdf_image_paths], | |
| outputs=[status_textbox] + all_slide_images, # Add status_textbox output | |
| show_progress="hidden", | |
| ).then(lambda: gr.update(selected=2), outputs=tabs_widget) # Switch to Tab 3 | |
| # Step 3 Click Handler | |
| step3_inputs = ( | |
| [state_temp_dir, state_md_path, state_slides_data] | |
| + all_code_editors | |
| + all_notes_textboxes | |
| ) | |
| step3_outputs = ( | |
| [ | |
| status_textbox, # Added status output | |
| state_audio_dir, | |
| btn_generate_video, # Enable button in Tab 4 | |
| btn_generate_audio, # Disable self | |
| ] | |
| + all_audio_players | |
| + all_regen_buttons | |
| ) | |
| btn_generate_audio.click( | |
| fn=step3_generate_audio, | |
| inputs=step3_inputs, | |
| outputs=step3_outputs, | |
| show_progress="full", | |
| ).then( | |
| fn=lambda: ( | |
| gr.update(value="Audio generated. Proceed to Step 4."), | |
| gr.update(selected=3), | |
| ), # Status update is handled by the main function yield now | |
| # No inputs needed for this simple status update + tab switch | |
| outputs=[status_textbox, tabs_widget], | |
| show_progress="hidden", | |
| ) | |
| # Step 4 Click Handler | |
| step4_inputs = [state_temp_dir, state_audio_dir, state_pdf_path] | |
| step4_outputs = [ | |
| status_textbox, # Added status output | |
| video_output, # Update video output in Tab 4 | |
| btn_generate_video, # Disable self | |
| ] | |
| btn_generate_video.click( | |
| fn=step4_generate_video, | |
| inputs=step4_inputs, | |
| outputs=step4_outputs, | |
| show_progress="full", | |
| ) | |
| if __name__ == "__main__": | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.makedirs(URL_CACHE_DIR, exist_ok=True) | |
| # --- Check/Install Dependencies --- (Added) | |
| logger.info("Checking for external dependencies...") | |
| backslide_ok = check_and_install_npm_dependency( | |
| "bs", | |
| "backslide", | |
| "Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g backslide'", | |
| ) | |
| decktape_ok = check_and_install_npm_dependency( | |
| "decktape", | |
| "decktape", | |
| "Please install Node.js/npm (https://nodejs.org/) and then run 'npm install -g decktape'", | |
| ) | |
| if not backslide_ok: | |
| gr.Warning( | |
| "Backslide (bs) command check/install failed. PDF generation might fail. Check logs." | |
| ) | |
| if not decktape_ok: | |
| gr.Warning( | |
| "Decktape command check/install failed. PDF generation might fail. Check logs." | |
| ) | |
| # --- End Dependency Check --- | |
| demo.queue().launch(debug=True) | |