From 359a84105c0ae5a49d52a4ce1ca5973cd874b252 Mon Sep 17 00:00:00 2001 From: Riley LePrell <150089004+RileyLePrell@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:29:52 -0500 Subject: [PATCH] Streamlit Hosting + Removal of Junk Files I had previously been running this app locally having people set up their API's in an ENV folder bla bla, but wanted to make this as easy as possible, so converted it to a streamlit application. --- Preprocessing/.streamlit/config.toml | 21 +- Preprocessing/App/main.py | 489 ++++++++++++------ Preprocessing/docker/requirements.txt | 3 +- .../audio_transcription.py | 20 +- .../chunking_vector_embedding.py | 55 +- .../preprocessing_pipeline/pdf_conversion.py | 29 +- .../preprocessing_pipeline/text_cleaning.py | 105 ++-- Preprocessing/utils/azure_blob_utils.py | 131 +++-- Preprocessing/utils/env_setup.py | 12 - Preprocessing/utils/file_utils.py | 0 Preprocessing/utils/metadata_utils.py | 0 11 files changed, 599 insertions(+), 266 deletions(-) delete mode 100644 Preprocessing/utils/env_setup.py delete mode 100644 Preprocessing/utils/file_utils.py delete mode 100644 Preprocessing/utils/metadata_utils.py diff --git a/Preprocessing/.streamlit/config.toml b/Preprocessing/.streamlit/config.toml index f41f6e6f..cee02fc0 100644 --- a/Preprocessing/.streamlit/config.toml +++ b/Preprocessing/.streamlit/config.toml @@ -1,2 +1,21 @@ [server] -maxUploadSize = 1000 # Set the upload size limit in MB +headless = true +port = 8501 # Default port for local testing +enableCORS = false +enableXsrfProtection = false +maxUploadSize = 1000 + + +[theme] +base = "light" +primaryColor = "#0D6051" +secondaryBackgroundColor = "#f0f2e9" +textColor = "#263d36" +font = "IBM Plex Mono" + +[global] +pageTitle = "Minute Mate" +favicon = "assets/favicon.ico" + +[home] +welcomeMessage = "Welcome to Minute Mate: Your Meeting Transcription and Summarization Tool!" diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index d6f339f8..5d373bb5 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -1,20 +1,16 @@ # Standard Python imports -import os import sys +import os +from pathlib import Path from datetime import datetime -# Load environment variables and set Python path -from dotenv import load_dotenv -load_dotenv() - -# Set PYTHONPATH from .env if available -python_path = os.getenv("PYTHONPATH") -if python_path: - sys.path.append(python_path) +# Dynamically add the parent directory to PYTHONPATH +sys.path.append(str(Path(__file__).resolve().parent.parent)) # Import dependencies import streamlit as st import weaviate # Import Weaviate client +from azure.storage.blob import BlobServiceClient from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text from preprocessing_pipeline.audio_transcription import transcribe_audio from preprocessing_pipeline.text_cleaning import clean_text @@ -26,15 +22,46 @@ from utils.azure_blob_utils import ( upload_to_azure, download_from_azure, - list_blobs_in_folder + list_blobs_in_folder, + get_blob_service_clients, # Use this standardized function ) -# Set up Weaviate client -client = weaviate.Client( - url=os.getenv("WEAVIATE_URL"), - auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY")) +# Helper function: Initialize Weaviate client +def get_weaviate_client(): + api_keys = st.session_state.get("api_keys", {}) + weaviate_url = api_keys.get("WEAVIATE_URL") + weaviate_api_key = api_keys.get("WEAVIATE_API_KEY") + + if not weaviate_url or not weaviate_api_key: + st.error("Weaviate API configuration is missing. Please set it on the Home Page.") + return None + + return weaviate.Client( + url=weaviate_url, + auth_client_secret=weaviate.AuthApiKey(api_key=weaviate_api_key) ) +# Helper function: Validate API Keys +def are_api_keys_set(): + """ + Validates that all required API keys are present and non-empty in the session state. + """ + required_keys = [ + "OPENAI_API_KEY", + "WEAVIATE_URL", + "WEAVIATE_API_KEY", + "ASSEMBLY_AI_KEY", + "AZURE_STORAGE_CONNECTION_STRING", + "AZURE_STORAGE_CONTAINER" + ] + return all( + key in st.session_state.get("api_keys", {}) and st.session_state["api_keys"][key] + for key in required_keys + ) + +# Initialize clients dynamically +client = None + # Helper function: Generate standardized file names def generate_file_name(metadata, stage): meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d") @@ -51,6 +78,16 @@ def save_file_with_overwrite(file_path, content): # Helper function: Fetch documents from Weaviate def fetch_uploaded_documents(): + """ + Fetches documents stored in Weaviate. + Returns: + list: List of uploaded documents with metadata. + """ + client = get_weaviate_client() + if not client: + st.error("Weaviate client is not initialized. Please configure API keys.") + return [] + query = """ { Get { @@ -65,13 +102,17 @@ def fetch_uploaded_documents(): } } """ - response = client.query.raw(query) - documents = response.get("data", {}).get("Get", {}).get("Documents", []) - return documents + try: + response = client.query.raw(query) + documents = response.get("data", {}).get("Get", {}).get("Documents", []) + return documents + except Exception as e: + st.error(f"Error fetching documents from Weaviate: {e}") + return [] # Home Page def home_page(): - # Custom styling with IBM Plex Mono + # Custom styling for the homepage st.markdown(""" """, unsafe_allow_html=True) @@ -122,48 +171,168 @@ def home_page():

Minute Mate

- Welcome to Minute Mate; this is a staff-level application to upload meeting audios, minutes, and agendas to provide further context to the front end. + Welcome to Minute Mate! Use the sidebar to configure your API keys and get started. + Once configured, navigate using the buttons below to upload files or view documents.

""", unsafe_allow_html=True) - # Navigation buttons + # Sidebar for API Key Configuration and Instructions + st.sidebar.header("Setup") + + # Collapsible section for API Key Configuration + with st.sidebar.expander("API Key Configuration", expanded=True): + st.subheader("Submit Your API Keys") + with st.form(key="api_key_form"): + # OpenAI Keys + openai_api_key = st.text_input("OpenAI API Key", type="password") + openai_base_url = st.text_input("OpenAI Base URL", value="https://api.openai.com/v1") + + # Weaviate Keys + weaviate_url = st.text_input("Weaviate URL", type="password") + weaviate_api_key = st.text_input("Weaviate API Key", type="password") + + # AssemblyAI Key + assembly_ai_key = st.text_input("AssemblyAI API Key", type="password") + + # Azure Keys + azure_connection_string = st.text_area("Azure Storage Connection String") + azure_container_name = st.text_input("Azure Storage Container Name", type="password") + + submit_button = st.form_submit_button("Save API Keys") + + if submit_button: + st.session_state["api_keys"] = { + "OPENAI_API_KEY": openai_api_key, + "OPENAI_BASE_URL": openai_base_url, + "WEAVIATE_URL": weaviate_url, + "WEAVIATE_API_KEY": weaviate_api_key, + "ASSEMBLY_AI_KEY": assembly_ai_key, + "AZURE_STORAGE_CONNECTION_STRING": azure_connection_string, + "AZURE_STORAGE_CONTAINER": azure_container_name + } + st.success("API Keys saved successfully!") + st.rerun() + + # Collapsible section for How to Get API Keys + with st.sidebar.expander("How to Get API Keys", expanded=False): + st.subheader("API Key Setup Instructions") + st.markdown(""" + - **OpenAI** + - [Get your OpenAI API Key](https://platform.openai.com/account/api-keys) + - Set `OPENAI_API_KEY` in the sidebar. + - For `OPENAI_BASE_URL`, use `https://api.openai.com/v1` or leave it blank. + + - **Weaviate** + - [Access your Weaviate Cluster details](https://console.weaviate.cloud/cluster-details) + - Follow [this guide](https://weaviate.io/developers/wcs/create-instance) to create a new cluster if needed. + - Set `WEAVIATE_URL` with the REST endpoint and `WEAVIATE_API_KEY` with the admin key. + + - **AssemblyAI** + - [Create an AssemblyAI account](https://www.assemblyai.com/app) + - Copy your API key from the homepage and set it in `ASSEMBLY_AI_KEY`. + + - **Azure** + - [Create a storage account](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) + - Go to the Access Keys section in Azure and copy the connection string into `AZURE_STORAGE_CONNECTION_STRING`. + - Specify the container name in `AZURE_STORAGE_CONTAINER_NAME`. + """) + + # Navigation Buttons with Validation col1, col2 = st.columns([1, 1]) - with col1: - if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): - st.session_state.page = "upload" - with col2: - if st.button("View Documents", key="view", help="View the documents that have been uploaded"): - st.session_state.page = "view" + if are_api_keys_set(): + try: + client = get_weaviate_client() + blob_service_client, container_client = get_blob_service_clients() + + # Validate connections + if not client: + st.error("Failed to connect to Weaviate. Please check your API configuration.") + return + if not blob_service_client or not container_client: + st.error("Failed to connect to Azure Blob Storage. Please check your API configuration.") + return + + with col1: + if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): + st.session_state.page = "upload" + with col2: + if st.button("View Documents", key="view", help="View the documents that have been uploaded"): + st.session_state.page = "view" + + except Exception as e: + st.error(f"Error validating API keys: {e}") + else: + st.warning("API Keys must be configured to access other pages.") + with col1: + st.button("Upload Files", key="upload_disabled", disabled=True) + with col2: + st.button("View Documents", key="view_disabled", disabled=True) -# Upload Files Page def upload_files_page(): st.title("Upload Municipal Meeting Documents") - - # Sidebar for metadata and options selection - st.sidebar.header("Document Metadata & Transcription Options") - meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today()) - meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) - file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"]) - model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"]) - speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization") - - # Save metadata - if st.sidebar.button("Save Metadata"): - st.session_state["metadata"] = { - "meeting_date": meeting_date, - "meeting_type": meeting_type, - "file_type": file_type, - "model": model_option, - "speaker_labels": speaker_labels - } + # Sidebar Configuration + st.sidebar.header("Upload File Configuration") + + # Collapsible section for Document Metadata and Transcription Options + with st.sidebar.expander("Document Metadata & Transcription Options", expanded=True): + st.subheader("Document Metadata") + meeting_date = st.date_input("Select Meeting Date", datetime.today()) + meeting_type = st.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) + file_type = st.radio("File Type", ["Agenda", "Minutes", "Audio"]) + model_option = st.selectbox("Select Transcription Model", ["default", "best", "nano"]) + speaker_labels = st.checkbox("Enable Speaker Diarization") + + # Save metadata into session state + if st.button("Save Metadata", key="save_metadata"): + st.session_state["metadata"] = { + "meeting_date": meeting_date, + "meeting_type": meeting_type, + "file_type": file_type, + "model": model_option, + "speaker_labels": speaker_labels + } + st.success("Metadata saved successfully!") + + # Collapsible section to display Saved API Keys + with st.sidebar.expander("Saved API Keys", expanded=False): + st.subheader("API Keys in Use") + if "api_keys" in st.session_state: + api_keys = st.session_state["api_keys"] + st.markdown(f""" + - **OpenAI API Key**: {api_keys.get("OPENAI_API_KEY", "Not Set")} + - **OpenAI Base URL**: {api_keys.get("OPENAI_BASE_URL", "Not Set")} + - **Weaviate URL**: {api_keys.get("WEAVIATE_URL", "Not Set")} + - **Weaviate API Key**: {api_keys.get("WEAVIATE_API_KEY", "Not Set")} + - **AssemblyAI API Key**: {api_keys.get("ASSEMBLY_AI_KEY", "Not Set")} + - **Azure Connection String**: {api_keys.get("AZURE_STORAGE_CONNECTION_STRING", "Not Set")} + - **Azure Container Name**: {api_keys.get("AZURE_STORAGE_CONTAINER", "Not Set")} + """) + else: + st.warning("No API keys found. Please configure them on the Home Page.") + + # Initialize Azure Blob Storage and Weaviate clients + try: + blob_service_client, container_client = get_blob_service_clients() + weaviate_client = get_weaviate_client() + + if not blob_service_client or not container_client or not weaviate_client: + st.error("API key configurations are incomplete. Please configure all keys on the Home Page.") + return + + except Exception as e: + st.error(f"Error initializing clients: {e}") + return + + # Main Upload Section st.header("Upload New Document") file = st.file_uploader("Choose a file to upload", type=["pdf", "mp3", "wav"]) # Initialize progress bar progress_bar = st.progress(0) + # Handle file upload if file and "metadata" in st.session_state: metadata = st.session_state["metadata"] @@ -171,67 +340,71 @@ def upload_files_page(): file_extension = os.path.splitext(file.name)[1] raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}" - # Stage 1: Upload to Raw - upload_to_azure("raw", raw_file_name, file.read()) - st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}") - progress_bar.progress(20) - - # Stage 2: Process based on file type - if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: - with st.spinner(f"Transcribing audio using {metadata['model']} model..."): - transcribed_text = transcribe_audio( - raw_file_name=raw_file_name, - model=metadata["model"], - speaker_labels=metadata["speaker_labels"] + try: + # Upload the file to Azure Blob Storage + upload_to_azure("raw", raw_file_name, file.read()) + st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}") + progress_bar.progress(20) + + # Stage 2: Process based on file type + if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: + with st.spinner(f"Transcribing audio using {metadata['model']} model..."): + transcribed_text = transcribe_audio( + raw_file_name=raw_file_name, + model=metadata["model"], + speaker_labels=metadata["speaker_labels"] + ) + if transcribed_text: + dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt" + upload_to_azure("dirty", dirty_file_name, transcribed_text) + st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}") + st.text_area("Transcribed Audio Text:", transcribed_text, height=200) + st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name) + else: + st.error("Failed to transcribe the audio.") + + elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": + with st.spinner("Extracting text from PDF..."): + extracted_text = convert_pdf_to_text(raw_file_name) + if extracted_text: + dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt" + upload_to_azure("dirty", dirty_file_name, extracted_text) + st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}") + st.text_area("Extracted PDF Text:", extracted_text, height=200) + st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name) + else: + st.error("Failed to extract text from the PDF.") + + # Stage 3: Clean Text and Upload to Clean + dirty_content = download_from_azure("dirty", dirty_file_name) + with st.spinner("Cleaning text using generative AI..."): + cleaned_text = clean_text(dirty_file_name) # Pass the actual content + clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" + upload_to_azure("clean", clean_file_name, cleaned_text) + st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") + + # Stage 4: Check and Delete Existing Embeddings + with st.spinner("Checking for existing embeddings in Weaviate..."): + matching_chunks = fetch_matching_chunks( + str(metadata["meeting_date"]), + metadata["meeting_type"], + metadata["file_type"], + clean_file_name ) - if transcribed_text: - dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt" - upload_to_azure("dirty", dirty_file_name, transcribed_text) - st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}") - st.text_area("Transcribed Audio Text:", transcribed_text, height=200) - st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name) - else: - st.error("Failed to transcribe the audio.") - - elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": - with st.spinner("Extracting text from PDF..."): - extracted_text = convert_pdf_to_text(raw_file_name) - if extracted_text: - dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt" - upload_to_azure("dirty", dirty_file_name, extracted_text) - st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}") - st.text_area("Extracted PDF Text:", extracted_text, height=200) - st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name) - else: - st.error("Failed to extract text from the PDF.") - - # Stage 3: Clean Text and Upload to Clean - dirty_content = download_from_azure("dirty", dirty_file_name) - with st.spinner("Cleaning text using generative AI..."): - cleaned_text = clean_text(dirty_file_name) - clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" - upload_to_azure("clean", clean_file_name, cleaned_text) - st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") - - # Stage 4: Check and Delete Existing Embeddings - with st.spinner("Checking for existing embeddings..."): - matching_chunks = fetch_matching_chunks( - str(metadata["meeting_date"]), - metadata["meeting_type"], - metadata["file_type"], - clean_file_name - ) - if matching_chunks: - st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...") - delete_matching_chunks(matching_chunks) - else: - st.write("No existing chunks found.") - - # Stage 5: Chunk and Embed into Weaviate - with st.spinner("Chunking and embedding text into Weaviate..."): - tokenize_and_embed_text(clean_file_name, metadata) - st.success("Document processed and embedded successfully!") - progress_bar.progress(100) + if matching_chunks: + st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...") + delete_matching_chunks(matching_chunks) + else: + st.write("No existing chunks found.") + + # Stage 5: Chunk and Embed into Weaviate + with st.spinner("Chunking and embedding text into Weaviate..."): + tokenize_and_embed_text(clean_file_name, metadata) + st.success("Document processed and embedded successfully!") + progress_bar.progress(100) + + except Exception as e: + st.error(f"Error processing file: {e}") # Navigation buttons col1, col2 = st.columns([1, 1]) @@ -242,58 +415,70 @@ def upload_files_page(): if st.button("View Documents"): st.session_state.page = "view" -# View Documents Page + def view_documents_page(): - st.title("Uploaded Documents") + st.title("View Uploaded Files") + + # Sidebar Configuration + st.sidebar.header("View Documents Configuration") + + # Collapsible section to display Saved API Keys + with st.sidebar.expander("Saved API Keys", expanded=False): + st.subheader("API Keys in Use") + if "api_keys" in st.session_state: + api_keys = st.session_state["api_keys"] + st.markdown(f""" + - **OpenAI API Key**: {api_keys.get("OPENAI_API_KEY", "Not Set")} + - **OpenAI Base URL**: {api_keys.get("OPENAI_BASE_URL", "Not Set")} + - **Weaviate URL**: {api_keys.get("WEAVIATE_URL", "Not Set")} + - **Weaviate API Key**: {api_keys.get("WEAVIATE_API_KEY", "Not Set")} + - **AssemblyAI API Key**: {api_keys.get("ASSEMBLY_AI_KEY", "Not Set")} + - **Azure Connection String**: {api_keys.get("AZURE_STORAGE_CONNECTION_STRING", "Not Set")} + - **Azure Container Name**: {api_keys.get("AZURE_STORAGE_CONTAINER", "Not Set")} + """) + else: + st.warning("No API keys found. Please configure them on the Home Page.") + + # Fetch files and group them by folder and date try: - # Fetch blobs from each folder - raw_blobs = list_blobs_in_folder("raw") - dirty_blobs = list_blobs_in_folder("dirty") - clean_blobs = list_blobs_in_folder("clean") - - def group_blobs_by_date(blobs): - """Groups blobs by their date extracted from the file name.""" - grouped = {} - for blob in blobs: - try: - file_name = blob.split("/")[-1] # Extract the file name - parts = file_name.split("_") # Split into parts: ['2023', '12', '12', 'BOC', 'Agenda', ...] - date_str = "_".join(parts[:3]) # Join the first three parts: '2023_12_12' - readable_date = datetime.strptime(date_str, "%Y_%m_%d").strftime("%B %d, %Y") - if readable_date not in grouped: - grouped[readable_date] = [] - grouped[readable_date].append(blob) - except (ValueError, IndexError): - if "Unknown Date" not in grouped: - grouped["Unknown Date"] = [] - grouped["Unknown Date"].append(blob) - return grouped - - raw_grouped = group_blobs_by_date(raw_blobs) - dirty_grouped = group_blobs_by_date(dirty_blobs) - clean_grouped = group_blobs_by_date(clean_blobs) - - def display_grouped_blobs(grouped_blobs, category): - if grouped_blobs: - st.subheader(f"{category.capitalize()} Documents") - for date, blobs in grouped_blobs.items(): - with st.expander(f"Date: {date}", expanded=False): - for blob in blobs: - st.write(f"- {blob}") - if st.button(f"Download {blob}", key=f"download_{category}_{blob}"): - file_content = download_from_azure(category, blob) - st.download_button("Download", data=file_content, file_name=blob) - else: - st.info(f"No documents found in the {category} category.") - - display_grouped_blobs(raw_grouped, "raw") - display_grouped_blobs(dirty_grouped, "dirty") - display_grouped_blobs(clean_grouped, "clean") + raw_files = list_blobs_in_folder("raw") + dirty_files = list_blobs_in_folder("dirty") + clean_files = list_blobs_in_folder("clean") + + def display_grouped_files(folder_name, grouped_files): + """ + Display grouped files by date for a specific folder. + + Args: + folder_name (str): The name of the folder (raw, dirty, clean). + grouped_files (dict): Dictionary of grouped files by date. + """ + with st.expander(f"{folder_name.capitalize()} Files", expanded=False): + for date, files in grouped_files.items(): + st.markdown(f"**Date: {date}**") + for file_path in files: + file_name = file_path.split("/")[-1] + if st.button(f"Download {file_name}", key=f"{folder_name}_{file_name}_button"): + try: + file_content = download_from_azure(folder_name, file_name) + st.download_button( + label=f"Download {file_name}", + data=file_content, + file_name=file_name, + key=f"download_{folder_name}_{file_name}" + ) + except Exception as e: + st.error(f"Error downloading {file_name}: {e}") + + # Display files for each folder + display_grouped_files("clean", clean_files) + display_grouped_files("dirty", dirty_files) + display_grouped_files("raw", raw_files) except Exception as e: - st.error(f"Error fetching documents from Azure Blob Storage: {e}") + st.error(f"Error fetching files from Azure Blob Storage: {e}") - # Navigation buttons + # Navigation Buttons col1, col2 = st.columns([1, 1]) with col1: if st.button("Return Home"): diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt index e90a5e3e..02da6eae 100644 --- a/Preprocessing/docker/requirements.txt +++ b/Preprocessing/docker/requirements.txt @@ -23,4 +23,5 @@ azure.storage.blob transformers chardet pytest -easyocr \ No newline at end of file +easyocr +tiktoken \ No newline at end of file diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py index 9d836030..6fb3c0de 100644 --- a/Preprocessing/preprocessing_pipeline/audio_transcription.py +++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py @@ -1,11 +1,15 @@ -import os import requests +import streamlit as st from utils.azure_blob_utils import download_from_azure -from utils.env_setup import load_env -# Load environment variables -load_env() -ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY") +# Dynamically fetch AssemblyAI API key from Streamlit session state +def get_assembly_ai_key(): + api_keys = st.session_state.get("api_keys", {}) + assembly_ai_key = api_keys.get("ASSEMBLY_AI_KEY") + if not assembly_ai_key: + raise ValueError("AssemblyAI API key is missing. Please configure it in the Streamlit app.") + return assembly_ai_key + ASSEMBLY_AI_ENDPOINT = "https://api.assemblyai.com/v2" def transcribe_audio(raw_file_name, model=None, speaker_labels=False): @@ -20,8 +24,11 @@ def transcribe_audio(raw_file_name, model=None, speaker_labels=False): Returns: - str: Transcribed text, or None if transcription fails. """ - headers = {"authorization": ASSEMBLY_AI_KEY} try: + # Fetch the AssemblyAI key dynamically + assembly_ai_key = get_assembly_ai_key() + headers = {"authorization": assembly_ai_key} + # Step 1: Download the raw audio file from Azure raw_content = download_from_azure("raw", raw_file_name, as_text=False) print(f"Downloaded {raw_file_name} from Azure for transcription.") @@ -82,3 +89,4 @@ def transcribe_audio(raw_file_name, model=None, speaker_labels=False): except Exception as e: print(f"Error during transcription: {e}") return None + diff --git a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py index 380d96cd..37107c9a 100644 --- a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py +++ b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py @@ -1,29 +1,35 @@ -import os -from openai import OpenAI +import streamlit as st +import requests import weaviate import tiktoken # Use tiktoken for OpenAI-compatible tokenization -from utils.env_setup import load_env from utils.azure_blob_utils import download_from_azure -# Load environment variables -load_env() -WEAVIATE_URL = os.getenv("WEAVIATE_URL") -WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +# Dynamic API Key Retrieval +def get_weaviate_client(): + api_keys = st.session_state.get("api_keys", {}) + weaviate_url = api_keys.get("WEAVIATE_URL") + weaviate_api_key = api_keys.get("WEAVIATE_API_KEY") -# Initialize Weaviate client -client = weaviate.Client( - url=WEAVIATE_URL, - auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) + if not weaviate_url or not weaviate_api_key: + raise ValueError("Weaviate API configuration is missing. Please configure it in the Streamlit app.") + + return weaviate.Client( + url=weaviate_url, + auth_client_secret=weaviate.AuthApiKey(api_key=weaviate_api_key) ) -# Initialize OpenAI client for embedding -openai_client = OpenAI(api_key=OPENAI_API_KEY) +def get_openai_api_key(): + api_keys = st.session_state.get("api_keys", {}) + openai_api_key = api_keys.get("OPENAI_API_KEY") + + if not openai_api_key: + raise ValueError("OpenAI API key is missing. Please configure it in the Streamlit app.") + + return openai_api_key # Initialize tiktoken for OpenAI's embedding model tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002") - def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document): """ Fetch matching chunks from Weaviate based on metadata. @@ -37,6 +43,7 @@ def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document Returns: list: A list of matching documents. """ + client = get_weaviate_client() query = f""" {{ Get {{ @@ -67,6 +74,7 @@ def delete_matching_chunks(documents): Args: documents (list): List of documents with IDs to delete. """ + client = get_weaviate_client() for doc in documents: doc_id = doc["_additional"]["id"] client.data_object.delete(doc_id) @@ -83,6 +91,10 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250): max_chunk_size (int): Maximum token size for each chunk. """ try: + # Initialize clients dynamically + client = get_weaviate_client() + openai_api_key = get_openai_api_key() + # Download cleaned text from Azure clean_text = download_from_azure("clean", clean_file_name) tokens = tokenizer.encode(clean_text) @@ -107,8 +119,17 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250): # Embed and upload each chunk for i, chunk in enumerate(chunks): - response = openai_client.embeddings.create(input=chunk, model="text-embedding-ada-002") - embedding = response.data[0].embedding + # Request embedding from OpenAI + headers = {"Authorization": f"Bearer {openai_api_key}"} + response = requests.post( + "https://api.openai.com/v1/embeddings", + headers=headers, + json={"input": chunk, "model": "text-embedding-ada-002"} + ) + if response.status_code != 200: + raise ValueError(f"OpenAI embedding error: {response.status_code} - {response.text}") + + embedding = response.json()["data"][0]["embedding"] client.data_object.create( data_object={ diff --git a/Preprocessing/preprocessing_pipeline/pdf_conversion.py b/Preprocessing/preprocessing_pipeline/pdf_conversion.py index 7b3c7499..d9c7e77c 100644 --- a/Preprocessing/preprocessing_pipeline/pdf_conversion.py +++ b/Preprocessing/preprocessing_pipeline/pdf_conversion.py @@ -3,8 +3,10 @@ from PIL import Image from io import BytesIO import numpy as np +import streamlit as st from utils.azure_blob_utils import download_from_azure + def convert_pdf_to_text(raw_file_name): """ Extracts text from a PDF file. Uses EasyOCR as a fallback for scanned PDFs. @@ -17,33 +19,34 @@ def convert_pdf_to_text(raw_file_name): """ try: # Step 1: Download the raw file from Azure Blob Storage + print(f"Downloading {raw_file_name} from Azure Blob Storage (raw folder)...") raw_content = download_from_azure("raw", raw_file_name, as_text=False) - # Step 2: Open the PDF content + # Step 2: Open the PDF content using PyMuPDF (fitz) pdf_document = fitz.open(stream=raw_content, filetype="pdf") - text = "" - reader = easyocr.Reader(['en']) # Initialize EasyOCR for English + text = "" # Initialize a string to hold extracted text + reader = easyocr.Reader(['en'], gpu=False) # Initialize EasyOCR for English (disable GPU for portability) for page_num in range(pdf_document.page_count): page = pdf_document[page_num] - # Attempt to extract text directly + # Attempt to extract text directly from the page page_text = page.get_text() - if page_text.strip(): # If direct text is available - print(f"Text extracted directly from page {page_num + 1}.") + if page_text.strip(): # If direct text extraction is successful + print(f"Direct text extracted from page {page_num + 1}.") text += page_text else: # Fallback to OCR for scanned pages - print(f"Applying OCR on page {page_num + 1} of {raw_file_name}.") - pix = page.get_pixmap(dpi=300) # Render page to an image - img = Image.open(BytesIO(pix.tobytes("png"))) + print(f"Direct text extraction failed on page {page_num + 1}. Applying OCR.") + pix = page.get_pixmap(dpi=300) # Render the page as a high-resolution image + img = Image.open(BytesIO(pix.tobytes("png"))) # Convert rendered image to a PIL Image img_array = np.array(img) # Convert PIL Image to NumPy array for EasyOCR - ocr_text = reader.readtext(img_array, detail=0) # Extract text with EasyOCR - text += "\n".join(ocr_text) + ocr_text = reader.readtext(img_array, detail=0) # Perform OCR with EasyOCR + text += "\n".join(ocr_text) # Append the OCR results to the text string - pdf_document.close() + pdf_document.close() # Close the PDF document print(f"Successfully extracted text from {raw_file_name}.") return text except Exception as e: - print(f"Error in OCR for {raw_file_name}: {e}") + print(f"Error processing PDF {raw_file_name}: {e}") return None diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py index a9912220..d4f3f3b5 100644 --- a/Preprocessing/preprocessing_pipeline/text_cleaning.py +++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py @@ -1,16 +1,28 @@ -import os +import streamlit as st +import tiktoken # For OpenAI-compatible tokenization from openai import OpenAI -import tiktoken # Use tiktoken for OpenAI-compatible tokenization -from utils.env_setup import load_env -from utils.azure_blob_utils import download_from_azure, upload_to_azure - -# Load environment variables -load_env() -client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) +from utils.azure_blob_utils import download_from_azure # Initialize tiktoken for OpenAI's GPT models tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo") # Specify the OpenAI model + +def get_openai_client(): + """ + Retrieves the OpenAI client using the API key from Streamlit session state. + + Returns: + OpenAI: OpenAI client object. + """ + api_keys = st.session_state.get("api_keys", {}) + openai_api_key = api_keys.get("OPENAI_API_KEY") + + if not openai_api_key: + raise ValueError("OpenAI API Key is missing. Please configure it on the Home Page.") + + return OpenAI(api_key=openai_api_key) + + def tokenize_and_split_text(text, max_chunk_size=250): """ Tokenizes and splits text into smaller chunks within the token size limit. @@ -22,6 +34,10 @@ def tokenize_and_split_text(text, max_chunk_size=250): Returns: list of str: List of smaller text chunks. """ + # Validate text input + if not text or text.strip() == "": + raise ValueError("Text input is empty or invalid.") + # Tokenize the text into tokens tokens = tokenizer.encode(text) @@ -32,12 +48,14 @@ def tokenize_and_split_text(text, max_chunk_size=250): ] return chunks -def clean_text_chunk(chunk): + +def clean_text_chunk(chunk, openai_client): """ Cleans a single chunk of text using OpenAI GPT. Args: chunk (str): Text chunk to clean. + openai_client (OpenAI): OpenAI client instance. Returns: str: Cleaned text. @@ -51,13 +69,18 @@ def clean_text_chunk(chunk): {"role": "user", "content": f"Clean the following text for readability: {chunk}"} ] - response = client.chat.completions.create( - model="gpt-3.5-turbo", - messages=messages, - max_tokens=2000, - temperature=0.5 - ) - return response.choices[0].message.content.strip() + try: + response = openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=messages, + max_tokens=2000, + temperature=0.5 + ) + return response.choices[0].message.content.strip() + except Exception as e: + print(f"Error during chunk cleaning: {e}") + return f"Error in chunk cleaning: {e}" + def clean_text(dirty_file_name): """ @@ -69,16 +92,40 @@ def clean_text(dirty_file_name): Returns: str: Combined cleaned text. """ - print(f"Downloading {dirty_file_name} from Azure Blob Storage...") - dirty_content = download_from_azure("dirty", dirty_file_name) - - # Tokenize and split the text into chunks of 250 tokens - chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250) - cleaned_chunks = [] - - for i, chunk in enumerate(chunks): - print(f"Cleaning chunk {i + 1}/{len(chunks)}...") - cleaned_chunk = clean_text_chunk(chunk) - cleaned_chunks.append(cleaned_chunk) - - return "\n\n".join(cleaned_chunks) + try: + print(f"Downloading {dirty_file_name} from Azure Blob Storage (dirty folder)...") + dirty_content = download_from_azure("dirty", dirty_file_name) + + # Validate dirty content + if not dirty_content or dirty_content.strip() == "": + raise ValueError("The downloaded content is empty. Please check the file content.") + + # Initialize OpenAI client dynamically + openai_client = get_openai_client() + + # Tokenize and split the text into chunks + print("Tokenizing and splitting text into manageable chunks...") + chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250) + cleaned_chunks = [] + + for i, chunk in enumerate(chunks): + print(f"Cleaning chunk {i + 1}/{len(chunks)}: {chunk[:100]}...") + try: + cleaned_chunk = clean_text_chunk(chunk, openai_client) + except Exception as e: + print(f"Error cleaning chunk {i + 1}: {e}") + cleaned_chunk = f"Error cleaning this chunk: {e}" + + if not cleaned_chunk.strip(): + print(f"Chunk {i + 1} returned empty after cleaning.") + raise ValueError(f"Chunk {i + 1} cleaning failed. Received empty content.") + + cleaned_chunks.append(cleaned_chunk) + + print(f"Successfully cleaned {len(chunks)} chunks.") + return "\n\n".join(cleaned_chunks) + + except Exception as e: + print(f"Error during text cleaning: {e}") + return None + diff --git a/Preprocessing/utils/azure_blob_utils.py b/Preprocessing/utils/azure_blob_utils.py index 34dd3569..fea990aa 100644 --- a/Preprocessing/utils/azure_blob_utils.py +++ b/Preprocessing/utils/azure_blob_utils.py @@ -1,64 +1,125 @@ from azure.storage.blob import BlobServiceClient -import os -from dotenv import load_dotenv import chardet -load_dotenv() # Load environment variables from .env file +import streamlit as st -# Set up the blob service client -connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING") -container_name = os.getenv("AZURE_STORAGE_CONTAINER") -blob_service_client = BlobServiceClient.from_connection_string(connection_string) -container_client = blob_service_client.get_container_client(container_name) +def get_blob_service_clients(): + """ + Initializes the Azure Blob Service Client and Container Client dynamically from `st.session_state`. + + Returns: + tuple: (BlobServiceClient, ContainerClient) + """ + try: + api_keys = st.session_state.get("api_keys", {}) + connection_string = api_keys.get("AZURE_STORAGE_CONNECTION_STRING") + container_name = api_keys.get("AZURE_STORAGE_CONTAINER") + + if not connection_string: + raise ValueError("Azure Storage Connection String is missing. Please set it on the Home Page.") + if not container_name: + raise ValueError("Azure Storage Container Name is missing. Please set it on the Home Page.") + + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + container_client = blob_service_client.get_container_client(container_name) + return blob_service_client, container_client + except Exception as e: + print(f"Error initializing Azure Blob Service or Container Client: {e}") + raise e + +def list_blobs_in_folder(folder_name): + """ + List all blobs in a specific folder in Azure Blob Storage. + + Args: + folder_name (str): The folder to list blobs from. + + Returns: + dict: Dictionary where keys are dates, and values are lists of blob names for that date. + """ + try: + _, container_client = get_blob_service_clients() + blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/") + grouped_blobs = {} + + for blob in blobs: + file_name = blob.name.split("/")[-1] # Extract the file name + if not file_name: # Skip empty folder paths + continue + parts = file_name.split("_")[:3] # Extract the date (e.g., 2023_11_14) + if len(parts) == 3: + date_key = "_".join(parts) # Format: YYYY_MM_DD + else: + date_key = "Unknown Date" + grouped_blobs.setdefault(date_key, []).append(blob.name) + + return grouped_blobs + except Exception as e: + print(f"Error listing blobs in folder {folder_name}: {e}") + raise e def upload_to_azure(folder_name, file_name, file_content): """ - Upload a file to Azure Blob Storage. + Uploads a file to a specified folder in Azure Blob Storage. Args: folder_name (str): The folder in the Azure container (e.g., raw, dirty, clean). file_name (str): The name of the file to upload. file_content (bytes): The binary content of the file to upload. + + Returns: + str: Success message with the uploaded file path. """ - blob_name = f"{folder_name}/{file_name}" - blob_client = container_client.get_blob_client(blob_name) - blob_client.upload_blob(file_content, overwrite=True) - print(f"Uploaded to Azure: {blob_name}") + try: + # Validate inputs + if not folder_name or not file_name: + raise ValueError("Folder name and file name cannot be empty.") + if not file_content: + raise ValueError("File content is empty or None.") + + # Initialize Azure Blob Service clients + _, container_client = get_blob_service_clients() + + # Construct the blob path + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) + + # Upload the file, overwriting if it already exists + blob_client.upload_blob(file_content, overwrite=True) + print(f"Successfully uploaded {file_name} to Azure at {blob_name}.") + return f"File successfully uploaded to: {blob_name}" + except Exception as e: + print(f"Error uploading {file_name} to Azure: {e}") + raise Exception(f"Failed to upload file {file_name}: {e}") + def download_from_azure(folder_name, file_name, as_text=True): """ - Download a file from Azure Blob Storage with streaming. - """ - blob_name = f"{folder_name}/{file_name}" - blob_client = container_client.get_blob_client(blob_name) + Download a file from Azure Blob Storage. - # Print the URL for debugging - print(f"Generated Blob URL: {blob_client.url}") + Args: + folder_name (str): The folder in the Azure container (e.g., clean, dirty, raw). + file_name (str): The name of the file to download. + as_text (bool): Whether to decode the file content as text or return binary content. + Returns: + str or bytes: The content of the file as text or binary. + """ try: + _, container_client = get_blob_service_clients() + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) downloader = blob_client.download_blob(max_concurrency=5) + if as_text: - # Read as binary first and detect encoding + # Read as binary first and detect encoding for text decoding raw_data = downloader.readall() detected_encoding = chardet.detect(raw_data)['encoding'] print(f"Detected encoding: {detected_encoding}") return raw_data.decode(detected_encoding) # Decode using detected encoding else: - print(f"Downloading {blob_name} as binary.") - return downloader.readall() # Return binary content + return downloader.readall() # Return binary content if `as_text` is False + except Exception as e: print(f"Error downloading blob {blob_name}: {e}") raise e - -def list_blobs_in_folder(folder_name): - """ - List all blobs in a specific folder in Azure Blob Storage. - - Args: - folder_name (str): The folder to list blobs from. - - Returns: - list: List of blob names. - """ - blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/") - return [blob.name for blob in blobs] diff --git a/Preprocessing/utils/env_setup.py b/Preprocessing/utils/env_setup.py deleted file mode 100644 index acc1b513..00000000 --- a/Preprocessing/utils/env_setup.py +++ /dev/null @@ -1,12 +0,0 @@ -import os -import sys -from dotenv import load_dotenv - -def load_env(): - """ - Loads environment variables from a .env file and adds PYTHONPATH. - """ - load_dotenv() - python_path = os.getenv("PYTHONPATH") - if python_path: - sys.path.append(python_path) diff --git a/Preprocessing/utils/file_utils.py b/Preprocessing/utils/file_utils.py deleted file mode 100644 index e69de29b..00000000 diff --git a/Preprocessing/utils/metadata_utils.py b/Preprocessing/utils/metadata_utils.py deleted file mode 100644 index e69de29b..00000000