Spaces:
Runtime error
Runtime error
| """ | |
| Build txtai workflows. | |
| Based on this example: https://github.com/neuml/txtai/blob/master/examples/workflows.py | |
| """ | |
| import os | |
| import nltk | |
| import yaml | |
| import pandas as pd | |
| import streamlit as st | |
| from txtai.embeddings import Documents, Embeddings | |
| from txtai.pipeline import Segmentation, Summary, Tabular, Textractor, Translation | |
| from txtai.workflow import ServiceTask, Task, UrlTask, Workflow | |
| class Process: | |
| """ | |
| Container for an active Workflow process instance. | |
| """ | |
| def get(components, data): | |
| """ | |
| Lookup or creates a new workflow process instance. | |
| Args: | |
| components: input components | |
| data: initial data, only passed when indexing | |
| Returns: | |
| Process | |
| """ | |
| process = Process(data) | |
| # Build workflow | |
| with st.spinner("Building workflow...."): | |
| process.build(components) | |
| return process | |
| def __init__(self, data): | |
| """ | |
| Creates a new Process. | |
| Args: | |
| data: initial data, only passed when indexing | |
| """ | |
| # Component options | |
| self.components = {} | |
| # Defined pipelines | |
| self.pipelines = {} | |
| # Current workflow | |
| self.workflow = [] | |
| # Embeddings index params | |
| self.embeddings = None | |
| self.documents = None | |
| self.data = data | |
| def build(self, components): | |
| """ | |
| Builds a workflow using components. | |
| Args: | |
| components: list of components to add to workflow | |
| """ | |
| # pylint: disable=W0108 | |
| tasks = [] | |
| for component in components: | |
| component = dict(component) | |
| wtype = component.pop("type") | |
| self.components[wtype] = component | |
| if wtype == "embeddings": | |
| self.embeddings = Embeddings({**component}) | |
| self.documents = Documents() | |
| tasks.append(Task(self.documents.add, unpack=False)) | |
| elif wtype == "segmentation": | |
| self.pipelines[wtype] = Segmentation(**self.components[wtype]) | |
| tasks.append(Task(self.pipelines[wtype])) | |
| elif wtype == "service": | |
| tasks.append(ServiceTask(**self.components[wtype])) | |
| elif wtype == "summary": | |
| self.pipelines[wtype] = Summary(component.pop("path")) | |
| tasks.append(Task(lambda x: self.pipelines["summary"](x, **self.components["summary"]))) | |
| elif wtype == "tabular": | |
| self.pipelines[wtype] = Tabular(**self.components[wtype]) | |
| tasks.append(Task(self.pipelines[wtype])) | |
| elif wtype == "textractor": | |
| self.pipelines[wtype] = Textractor(**self.components[wtype]) | |
| tasks.append(UrlTask(self.pipelines[wtype])) | |
| elif wtype == "translation": | |
| self.pipelines[wtype] = Translation() | |
| tasks.append(Task(lambda x: self.pipelines["translation"](x, **self.components["translation"]))) | |
| self.workflow = Workflow(tasks) | |
| def run(self, data): | |
| """ | |
| Runs a workflow using data as input. | |
| Args: | |
| data: input data | |
| """ | |
| if data and self.workflow: | |
| # Build tuples for embedding index | |
| if self.documents: | |
| data = [(x, element, None) for x, element in enumerate(data)] | |
| # Process workflow | |
| for result in self.workflow(data): | |
| if not self.documents: | |
| st.write(result) | |
| # Build embeddings index | |
| if self.documents: | |
| # Cache data | |
| self.data = list(self.documents) | |
| with st.spinner("Building embedding index...."): | |
| self.embeddings.index(self.documents) | |
| self.documents.close() | |
| # Clear workflow | |
| self.documents, self.pipelines, self.workflow = None, None, None | |
| def search(self, query): | |
| """ | |
| Runs a search. | |
| Args: | |
| query: input query | |
| """ | |
| if self.embeddings and query: | |
| st.markdown( | |
| """ | |
| <style> | |
| table td:nth-child(1) { | |
| display: none | |
| } | |
| table th:nth-child(1) { | |
| display: none | |
| } | |
| table {text-align: left !important} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| limit = min(5, len(self.data)) | |
| results = [] | |
| for result in self.embeddings.search(query, limit): | |
| # Tuples are returned when an index doesn't have stored content | |
| if isinstance(result, tuple): | |
| uid, score = result | |
| results.append({"text": self.find(uid), "score": f"{score:.2}"}) | |
| else: | |
| if "id" in result and "text" in result: | |
| result["text"] = self.content(result.pop("id"), result["text"]) | |
| if "score" in result and result["score"]: | |
| result["score"] = f'{result["score"]:.2}' | |
| results.append(result) | |
| df = pd.DataFrame(results) | |
| st.write(df.to_html(escape=False), unsafe_allow_html=True) | |
| def find(self, key): | |
| """ | |
| Lookup record from cached data by uid key. | |
| Args: | |
| key: id to search for | |
| Returns: | |
| text for matching id | |
| """ | |
| # Lookup text by id | |
| text = [text for uid, text, _ in self.data if uid == key][0] | |
| return self.content(key, text) | |
| def content(self, uid, text): | |
| """ | |
| Builds a content reference for uid and text. | |
| Args: | |
| uid: record id | |
| text: record text | |
| Returns: | |
| content | |
| """ | |
| if uid and uid.lower().startswith("http"): | |
| return f"<a href='{uid}' rel='noopener noreferrer' target='blank'>{text}</a>" | |
| return text | |
| class Application: | |
| """ | |
| Main application. | |
| """ | |
| def __init__(self, directory): | |
| """ | |
| Creates a new application. | |
| """ | |
| # Workflow configuration directory | |
| self.directory = directory | |
| def default(self, names): | |
| """ | |
| Gets default workflow index. | |
| Args: | |
| names: list of workflow names | |
| Returns: | |
| default workflow index | |
| """ | |
| # Get names as lowercase to match case-insensitive | |
| lnames = [name.lower() for name in names] | |
| # Get default workflow param | |
| params = st.experimental_get_query_params() | |
| index = params.get("default") | |
| index = index[0].lower() if index else 0 | |
| # Lookup index of workflow name, add 1 to account for "--" | |
| if index and index in lnames: | |
| return lnames.index(index) + 1 | |
| # Workflow not found, default to index 0 | |
| return 0 | |
| def load(self, components): | |
| """ | |
| Load an existing workflow file. | |
| Args: | |
| components: list of components to load | |
| Returns: | |
| (names of components loaded, workflow config) | |
| """ | |
| with open(os.path.join(self.directory, "config.yml"), encoding="utf-8") as f: | |
| config = yaml.safe_load(f) | |
| names = [row["name"] for row in config] | |
| files = [row["file"] for row in config] | |
| selected = st.selectbox("Load workflow", ["--"] + names, self.default(names)) | |
| if selected != "--": | |
| index = [x for x, name in enumerate(names) if name == selected][0] | |
| with open(os.path.join(self.directory, files[index]), encoding="utf-8") as f: | |
| workflow = yaml.safe_load(f) | |
| st.markdown("---") | |
| # Get tasks for first workflow | |
| tasks = list(workflow["workflow"].values())[0]["tasks"] | |
| selected = [] | |
| for task in tasks: | |
| name = task.get("action", task.get("task")) | |
| if name in components: | |
| selected.append(name) | |
| elif name in ["index", "upsert"]: | |
| selected.append("embeddings") | |
| return (selected, workflow) | |
| return (None, None) | |
| def state(self, key): | |
| """ | |
| Lookup a session state variable. | |
| Args: | |
| key: variable key | |
| Returns: | |
| variable value | |
| """ | |
| if key in st.session_state: | |
| return st.session_state[key] | |
| return None | |
| def appsetting(self, workflow, name): | |
| """ | |
| Looks up an application configuration setting. | |
| Args: | |
| workflow: workflow configuration | |
| name: setting name | |
| Returns: | |
| app setting value | |
| """ | |
| if workflow: | |
| config = workflow.get("app") | |
| if config: | |
| return config.get(name) | |
| return None | |
| def setting(self, config, name, default=None): | |
| """ | |
| Looks up a component configuration setting. | |
| Args: | |
| config: component configuration | |
| name: setting name | |
| default: default setting value | |
| Returns: | |
| setting value | |
| """ | |
| return config.get(name, default) if config else default | |
| def text(self, label, component, config, name, default=None): | |
| """ | |
| Create a new text input field. | |
| Args: | |
| label: field label | |
| component: component name | |
| config: component configuration | |
| name: setting name | |
| default: default setting value | |
| Returns: | |
| text input field value | |
| """ | |
| default = self.setting(config, name, default) | |
| if not default: | |
| default = "" | |
| elif isinstance(default, list): | |
| default = ",".join(default) | |
| elif isinstance(default, dict): | |
| default = ",".join(default.keys()) | |
| st.caption(label) | |
| st.code(default, language="yaml") | |
| return default | |
| def number(self, label, component, config, name, default=None): | |
| """ | |
| Creates a new numeric input field. | |
| Args: | |
| label: field label | |
| component: component name | |
| config: component configuration | |
| name: setting name | |
| default: default setting value | |
| Returns: | |
| numeric value | |
| """ | |
| value = self.text(label, component, config, name, default) | |
| return int(value) if value else None | |
| def boolean(self, label, component, config, name, default=False): | |
| """ | |
| Creates a new checkbox field. | |
| Args: | |
| label: field label | |
| component: component name | |
| config: component configuration | |
| name: setting name | |
| default: default setting value | |
| Returns: | |
| boolean value | |
| """ | |
| default = self.setting(config, name, default) | |
| st.caption(label) | |
| st.markdown(":white_check_mark:" if default else ":white_large_square:") | |
| return default | |
| def select(self, label, component, config, name, options, default=0): | |
| """ | |
| Creates a new select box field. | |
| Args: | |
| label: field label | |
| component: component name | |
| config: component configuration | |
| name: setting name | |
| options: list of dropdown options | |
| default: default setting value | |
| Returns: | |
| boolean value | |
| """ | |
| index = self.setting(config, name) | |
| index = [x for x, option in enumerate(options) if option == default] | |
| # Derive default index | |
| default = index[0] if index else default | |
| st.caption(label) | |
| st.code(options[default], language="yaml") | |
| return options[default] | |
| def split(self, text): | |
| """ | |
| Splits text on commas and returns a list. | |
| Args: | |
| text: input text | |
| Returns: | |
| list | |
| """ | |
| return [x.strip() for x in text.split(",")] | |
| def options(self, component, workflow, index): | |
| """ | |
| Extracts component settings into a component configuration dict. | |
| Args: | |
| component: component type | |
| workflow: existing workflow, can be None | |
| index: task index | |
| Returns: | |
| dict with component settings | |
| """ | |
| # pylint: disable=R0912, R0915 | |
| options = {"type": component} | |
| # Lookup component configuration | |
| # - Runtime components have config defined within tasks | |
| # - Pipeline components have config defined at workflow root | |
| config = None | |
| if workflow: | |
| if component in ["service", "translation"]: | |
| # Service config is found in tasks section | |
| tasks = list(workflow["workflow"].values())[0]["tasks"] | |
| tasks = [task for task in tasks if task.get("task") == component or task.get("action") == component] | |
| if tasks: | |
| config = tasks[0] | |
| else: | |
| config = workflow.get(component) | |
| if component == "embeddings": | |
| st.markdown(f"** {index + 1}.) Embeddings Index** \n*Index workflow output*") | |
| options["path"] = self.text("Embeddings model path", component, config, "path", "sentence-transformers/nli-mpnet-base-v2") | |
| options["upsert"] = self.boolean("Upsert", component, config, "upsert") | |
| options["content"] = self.boolean("Content", component, config, "content") | |
| elif component in ("segmentation", "textractor"): | |
| if component == "segmentation": | |
| st.markdown(f"** {index + 1}.) Segment** \n*Split text into semantic units*") | |
| else: | |
| st.markdown(f"** {index + 1}.) Textract** \n*Extract text from documents*") | |
| options["sentences"] = self.boolean("Split sentences", component, config, "sentences") | |
| options["lines"] = self.boolean("Split lines", component, config, "lines") | |
| options["paragraphs"] = self.boolean("Split paragraphs", component, config, "paragraphs") | |
| options["join"] = self.boolean("Join tokenized", component, config, "join") | |
| options["minlength"] = self.number("Min section length", component, config, "minlength") | |
| elif component == "service": | |
| st.markdown(f"** {index + 1}.) Service** \n*Extract data from an API*") | |
| options["url"] = self.text("URL", component, config, "url") | |
| options["method"] = self.select("Method", component, config, "method", ["get", "post"], 0) | |
| options["params"] = self.text("URL parameters", component, config, "params") | |
| options["batch"] = self.boolean("Run as batch", component, config, "batch", True) | |
| options["extract"] = self.text("Subsection(s) to extract", component, config, "extract") | |
| if options["params"]: | |
| options["params"] = {key: None for key in self.split(options["params"])} | |
| if options["extract"]: | |
| options["extract"] = self.split(options["extract"]) | |
| elif component == "summary": | |
| st.markdown(f"** {index + 1}.) Summary** \n*Abstractive text summarization*") | |
| options["path"] = self.text("Model", component, config, "path", "sshleifer/distilbart-cnn-12-6") | |
| options["minlength"] = self.number("Min length", component, config, "minlength") | |
| options["maxlength"] = self.number("Max length", component, config, "maxlength") | |
| elif component == "tabular": | |
| st.markdown(f"** {index + 1}.) Tabular** \n*Split tabular data into rows and columns*") | |
| options["idcolumn"] = self.text("Id columns", component, config, "idcolumn") | |
| options["textcolumns"] = self.text("Text columns", component, config, "textcolumns") | |
| options["content"] = self.text("Content", component, config, "content") | |
| if options["textcolumns"]: | |
| options["textcolumns"] = self.split(options["textcolumns"]) | |
| if options["content"]: | |
| options["content"] = self.split(options["content"]) | |
| if len(options["content"]) == 1 and options["content"][0] == "1": | |
| options["content"] = options["content"][0] | |
| elif component == "translation": | |
| st.markdown(f"** {index + 1}.) Translate** \n*Machine translation*") | |
| options["target"] = self.text("Target language code", component, config, "args", "en") | |
| st.markdown("---") | |
| return options | |
| def yaml(self, components): | |
| """ | |
| Builds a yaml string for components. | |
| Args: | |
| components: list of components to export to YAML | |
| Returns: | |
| (workflow name, YAML string) | |
| """ | |
| data = {"app": {"data": self.state("data"), "query": self.state("query")}} | |
| tasks = [] | |
| name = None | |
| for component in components: | |
| component = dict(component) | |
| name = wtype = component.pop("type") | |
| if wtype == "embeddings": | |
| upsert = component.pop("upsert") | |
| data[wtype] = component | |
| data["writable"] = True | |
| name = "index" | |
| tasks.append({"action": "upsert" if upsert else "index"}) | |
| elif wtype == "segmentation": | |
| data[wtype] = component | |
| tasks.append({"action": wtype}) | |
| elif wtype == "service": | |
| config = dict(**component) | |
| config["task"] = wtype | |
| tasks.append(config) | |
| elif wtype == "summary": | |
| data[wtype] = {"path": component.pop("path")} | |
| tasks.append({"action": wtype}) | |
| elif wtype == "tabular": | |
| data[wtype] = component | |
| tasks.append({"action": wtype}) | |
| elif wtype == "textractor": | |
| data[wtype] = component | |
| tasks.append({"action": wtype, "task": "url"}) | |
| elif wtype == "translation": | |
| data[wtype] = {} | |
| tasks.append({"action": wtype, "args": list(component.values())}) | |
| # Add in workflow | |
| data["workflow"] = {name: {"tasks": tasks}} | |
| return (name, yaml.dump(data)) | |
| def data(self, workflow): | |
| """ | |
| Gets input data. | |
| Args: | |
| workflow: workflow configuration | |
| Returns: | |
| input data | |
| """ | |
| # Get default data setting | |
| data = self.appsetting(workflow, "data") | |
| if not self.appsetting(workflow, "query"): | |
| data = st.text_input("Input", value=data) | |
| # Save data state | |
| st.session_state["data"] = data | |
| # Wrap data as list for workflow processing | |
| return [data] | |
| def query(self, workflow, index): | |
| """ | |
| Gets input query. | |
| Args: | |
| workflow: workflow configuration | |
| index: True if this is an indexing workflow | |
| Returns: | |
| input query | |
| """ | |
| default = self.appsetting(workflow, "query") | |
| default = default if default else "" | |
| # Get query if this is an indexing workflow | |
| query = st.text_input("Query", value=default) if index else None | |
| # Save query state | |
| st.session_state["query"] = query | |
| return query | |
| def process(self, workflow, components, index): | |
| """ | |
| Processes the current application action. | |
| Args: | |
| workflow: workflow configuration | |
| components: workflow components | |
| index: True if this is an indexing workflow | |
| """ | |
| # Get input data and initialize query | |
| data = self.data(workflow) | |
| query = self.query(workflow, index) | |
| # Get workflow process | |
| process = Process.get(components, data if index else None) | |
| # Run workflow process | |
| process.run(data) | |
| # Run search | |
| if index: | |
| process.search(query) | |
| def run(self): | |
| """ | |
| Runs Streamlit application. | |
| """ | |
| with st.sidebar: | |
| st.markdown("# Workflow builder \n*Build and apply workflows to data* ") | |
| st.markdown("Test workflows for Station. Read more about used data on [Hugging Face](https://huggingface.co/datasets/ag_news) and in the [Docs](https://neuml.github.io/txtai/workflow/).") | |
| st.markdown("---") | |
| # Component configuration | |
| components = ["embeddings", "segmentation", "service", "summary", "tabular", "textractor", "translation"] | |
| selected, workflow = self.load(components) | |
| if selected: | |
| # Get selected options | |
| components = [self.options(component, workflow, x) for x, component in enumerate(selected)] | |
| if selected: | |
| # Process current action | |
| self.process(workflow, components, "embeddings" in selected) | |
| with st.sidebar: | |
| # Generate export button after workflow is complete | |
| _, config = self.yaml(components) | |
| st.download_button("Export", config, file_name="workflow.yml", help="Export the API workflow as YAML") | |
| else: | |
| st.info("Select a workflow from the sidebar") | |
| if __name__ == "__main__": | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| # pylint: disable=W0702 | |
| try: | |
| nltk.sent_tokenize("This is a test. Split") | |
| except: | |
| nltk.download("punkt") | |
| # Create and run application | |
| app = Application("workflows") | |
| app.run() | |