Spaces:
Sleeping
Sleeping
| import google.generativeai as genai | |
| from typing import List | |
| from pathlib import Path | |
| import fitz | |
| import json | |
| import os | |
| import textwrap | |
| from settings import Chunk, Settings | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # Output artifact locations (align with api.py expectations) | |
| OUTPUT_CHUNKS_FILE = os.path.join( | |
| SCRIPT_DIR, "output_chunks.jsonl" | |
| ) # already used in api.py | |
| RAG_CONFIG_FILE = os.path.join( | |
| SCRIPT_DIR, "rag_prompt_config.jsonl" | |
| ) # already used in api.py | |
| # If you also want these in data/ instead, uncomment: | |
| # OUTPUT_CHUNKS_FILE = os.path.join(DATA_DIR, "output_chunks.jsonl") | |
| # RAG_CONFIG_FILE = os.path.join(DATA_DIR, "rag_prompt_config.jsonl") | |
| # Example system / base prompts (edit as needed) | |
| SYSTEM_PROMPT = { | |
| "role": "system", | |
| "content": "You are a helpful RAG assistant. Use only the provided context. If unsure, say you don't know.", | |
| } | |
| BASE_CHUNK = { | |
| "role": "base", | |
| "content": "Answer the user's query using only the contextual chunks below.", | |
| } | |
| def extract_pdf_text(filename: str) -> str: | |
| text = "" | |
| with fitz.open(filename) as doc: | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| def chunk_pdf(filename: str) -> List[Chunk]: | |
| client = genai.Client() | |
| text = extract_pdf_text(filename) | |
| # print(text) | |
| pdf_name = Path(filename).name | |
| prompt = f""" | |
| Split the following text into coherent chunks suitable for RAG. | |
| Each chunk should be 100-500 words. | |
| Do not cut mid-sentence, paragraph, or table. | |
| Preserve headings, bullet points, and tables. | |
| Return an array of JSON objects with this structure: | |
| {{ | |
| "content": "<chunk text>", | |
| "source": "{pdf_name}", | |
| "tags": [], | |
| "type": "prg" | |
| }} | |
| Text: | |
| {text} | |
| """ | |
| client = genai.Client() | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=prompt, | |
| config={ | |
| "response_mime_type": "application/json", | |
| "response_schema": Settings.response_schema, | |
| }, | |
| ) | |
| chunks: List[Chunk] = response.parsed | |
| return chunks | |
| def process_pdf_folder(folder_path): | |
| folder = Path(folder_path) | |
| pdfs = list(folder.glob("*.pdf")) | |
| all_chunks = [] | |
| if not pdfs: | |
| print(f"No PDF files found in {folder_path}") | |
| return [] | |
| else: | |
| pdfs.sort(key=lambda x: x.name) | |
| for pdf_file in pdfs: | |
| print(f"Processing PDF: {pdf_file.name}") | |
| chunks = chunk_pdf(filename=pdf_file) | |
| all_chunks.extend(chunks) | |
| return all_chunks | |
| def make_prg_chunk(text, filename): | |
| return [ | |
| { | |
| "content": text.strip(), | |
| "source": Path(filename).name, | |
| "tags": [], | |
| "type": "prg", | |
| } | |
| ] | |
| def process_prg_folder(folder_path): | |
| folder = Path(folder_path) | |
| all_chunks = [] | |
| prgs = list(folder.glob("*.prg")) | |
| if not prgs: | |
| print(f"No .prg files found in {folder_path}") | |
| return [] | |
| prgs.sort(key=lambda x: x.name) | |
| for prg_file in prgs: | |
| print(f"Processing PRG: {prg_file.name}") | |
| text = prg_file.read_text(encoding="utf-8", errors="ignore") | |
| chunk = make_prg_chunk(text, prg_file.name) | |
| all_chunks.extend(chunk) | |
| return all_chunks | |
| def read_source_files(): | |
| """Load all .txt / .md files from SOURCE_DIR.""" | |
| files = [] | |
| for name in os.listdir(SOURCE_DIR): | |
| if name.lower().endswith((".txt", ".md")): | |
| path = os.path.join(SOURCE_DIR, name) | |
| with open(path, "r", encoding="utf-8") as f: | |
| files.append((name, f.read())) | |
| if not files: | |
| # Provide a fallback demo file if none exist | |
| demo_path = os.path.join(SOURCE_DIR, "demo.txt") | |
| demo_text = ( | |
| "This is a demo knowledge file.\n" | |
| "Add your project or domain documentation as .txt or .md files here." | |
| ) | |
| with open(demo_path, "w", encoding="utf-8") as f: | |
| f.write(demo_text) | |
| files.append(("demo.txt", demo_text)) | |
| return files | |
| def chunk_text(text: str, max_chars: int = 1200, overlap: int = 150): | |
| """Simple character-based chunking with overlap.""" | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = min(len(text), start + max_chars) | |
| chunk = text[start:end] | |
| chunks.append(chunk.strip()) | |
| if end >= len(text): | |
| break | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| return chunks | |
| def build_chunks(): | |
| """Create chunk objects suitable for embedding.""" | |
| all_files = read_source_files() | |
| chunks = [] | |
| idx = 0 | |
| for filename, content in all_files: | |
| parts = chunk_text(content) | |
| for part in parts: | |
| chunks.append({"id": idx, "source": filename, "content": part}) | |
| idx += 1 | |
| return chunks | |
| def write_jsonl(path: str, records): | |
| with open(path, "w", encoding="utf-8") as f: | |
| for r in records: | |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
| def write_config(path: str): | |
| """Write system + base prompt config file (list with single object).""" | |
| obj = [{"system_prompt": SYSTEM_PROMPT, "base_chunk": BASE_CHUNK}] | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(obj, f, ensure_ascii=False, indent=2) | |
| def main(): | |
| pdf_folder = r"C:\Users\kogut\Python\Assembler_rag\data\pdfs" | |
| prg_folder = r"C:\Users\kogut\Python\Assembler_rag\data\prg" | |
| # pdf_folder = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("./data/pdfs") | |
| # prg_folder = Path(sys.argv[2]) if len(sys.argv) > 2 else None | |
| output_jsonl = "output_chunks.jsonl" | |
| all_chunks = process_pdf_folder(pdf_folder) | |
| if prg_folder: | |
| all_chunks += process_prg_folder(prg_folder) | |
| with open(output_jsonl, "w", encoding="utf-8") as f: | |
| json.dump(all_chunks, f, ensure_ascii=False, indent=2) | |
| print(f"Finished. {len(all_chunks)} total chunks written to {output_jsonl}") | |
| print(f"Generating RAG data from: {SOURCE_DIR}") | |
| chunks = build_chunks() | |
| print(f"Built {len(chunks)} chunks") | |
| write_jsonl(OUTPUT_CHUNKS_FILE, chunks) | |
| write_config(RAG_CONFIG_FILE) | |
| print(f"Wrote chunks to: {OUTPUT_CHUNKS_FILE}") | |
| print(f"Wrote config to: {RAG_CONFIG_FILE}") | |
| print("Done.") | |
| if __name__ == "__main__": | |
| main() | |