diff --git a/Preprocessing/.streamlit/config.toml b/Preprocessing/.streamlit/config.toml index f41f6e6f..cee02fc0 100644 --- a/Preprocessing/.streamlit/config.toml +++ b/Preprocessing/.streamlit/config.toml @@ -1,2 +1,21 @@ [server] -maxUploadSize = 1000 # Set the upload size limit in MB +headless = true +port = 8501 # Default port for local testing +enableCORS = false +enableXsrfProtection = false +maxUploadSize = 1000 + + +[theme] +base = "light" +primaryColor = "#0D6051" +secondaryBackgroundColor = "#f0f2e9" +textColor = "#263d36" +font = "IBM Plex Mono" + +[global] +pageTitle = "Minute Mate" +favicon = "assets/favicon.ico" + +[home] +welcomeMessage = "Welcome to Minute Mate: Your Meeting Transcription and Summarization Tool!" diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index d6f339f8..5d373bb5 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -1,20 +1,16 @@ # Standard Python imports -import os import sys +import os +from pathlib import Path from datetime import datetime -# Load environment variables and set Python path -from dotenv import load_dotenv -load_dotenv() - -# Set PYTHONPATH from .env if available -python_path = os.getenv("PYTHONPATH") -if python_path: - sys.path.append(python_path) +# Dynamically add the parent directory to PYTHONPATH +sys.path.append(str(Path(__file__).resolve().parent.parent)) # Import dependencies import streamlit as st import weaviate # Import Weaviate client +from azure.storage.blob import BlobServiceClient from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text from preprocessing_pipeline.audio_transcription import transcribe_audio from preprocessing_pipeline.text_cleaning import clean_text @@ -26,15 +22,46 @@ from utils.azure_blob_utils import ( upload_to_azure, download_from_azure, - list_blobs_in_folder + list_blobs_in_folder, + get_blob_service_clients, # Use this standardized function ) -# Set up Weaviate client -client = weaviate.Client( - url=os.getenv("WEAVIATE_URL"), - auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY")) +# Helper function: Initialize Weaviate client +def get_weaviate_client(): + api_keys = st.session_state.get("api_keys", {}) + weaviate_url = api_keys.get("WEAVIATE_URL") + weaviate_api_key = api_keys.get("WEAVIATE_API_KEY") + + if not weaviate_url or not weaviate_api_key: + st.error("Weaviate API configuration is missing. Please set it on the Home Page.") + return None + + return weaviate.Client( + url=weaviate_url, + auth_client_secret=weaviate.AuthApiKey(api_key=weaviate_api_key) ) +# Helper function: Validate API Keys +def are_api_keys_set(): + """ + Validates that all required API keys are present and non-empty in the session state. + """ + required_keys = [ + "OPENAI_API_KEY", + "WEAVIATE_URL", + "WEAVIATE_API_KEY", + "ASSEMBLY_AI_KEY", + "AZURE_STORAGE_CONNECTION_STRING", + "AZURE_STORAGE_CONTAINER" + ] + return all( + key in st.session_state.get("api_keys", {}) and st.session_state["api_keys"][key] + for key in required_keys + ) + +# Initialize clients dynamically +client = None + # Helper function: Generate standardized file names def generate_file_name(metadata, stage): meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d") @@ -51,6 +78,16 @@ def save_file_with_overwrite(file_path, content): # Helper function: Fetch documents from Weaviate def fetch_uploaded_documents(): + """ + Fetches documents stored in Weaviate. + Returns: + list: List of uploaded documents with metadata. + """ + client = get_weaviate_client() + if not client: + st.error("Weaviate client is not initialized. Please configure API keys.") + return [] + query = """ { Get { @@ -65,13 +102,17 @@ def fetch_uploaded_documents(): } } """ - response = client.query.raw(query) - documents = response.get("data", {}).get("Get", {}).get("Documents", []) - return documents + try: + response = client.query.raw(query) + documents = response.get("data", {}).get("Get", {}).get("Documents", []) + return documents + except Exception as e: + st.error(f"Error fetching documents from Weaviate: {e}") + return [] # Home Page def home_page(): - # Custom styling with IBM Plex Mono + # Custom styling for the homepage st.markdown(""" """, unsafe_allow_html=True) @@ -122,48 +171,168 @@ def home_page():

Minute Mate

- Welcome to Minute Mate; this is a staff-level application to upload meeting audios, minutes, and agendas to provide further context to the front end. + Welcome to Minute Mate! Use the sidebar to configure your API keys and get started. + Once configured, navigate using the buttons below to upload files or view documents.

""", unsafe_allow_html=True) - # Navigation buttons + # Sidebar for API Key Configuration and Instructions + st.sidebar.header("Setup") + + # Collapsible section for API Key Configuration + with st.sidebar.expander("API Key Configuration", expanded=True): + st.subheader("Submit Your API Keys") + with st.form(key="api_key_form"): + # OpenAI Keys + openai_api_key = st.text_input("OpenAI API Key", type="password") + openai_base_url = st.text_input("OpenAI Base URL", value="https://api.openai.com/v1") + + # Weaviate Keys + weaviate_url = st.text_input("Weaviate URL", type="password") + weaviate_api_key = st.text_input("Weaviate API Key", type="password") + + # AssemblyAI Key + assembly_ai_key = st.text_input("AssemblyAI API Key", type="password") + + # Azure Keys + azure_connection_string = st.text_area("Azure Storage Connection String") + azure_container_name = st.text_input("Azure Storage Container Name", type="password") + + submit_button = st.form_submit_button("Save API Keys") + + if submit_button: + st.session_state["api_keys"] = { + "OPENAI_API_KEY": openai_api_key, + "OPENAI_BASE_URL": openai_base_url, + "WEAVIATE_URL": weaviate_url, + "WEAVIATE_API_KEY": weaviate_api_key, + "ASSEMBLY_AI_KEY": assembly_ai_key, + "AZURE_STORAGE_CONNECTION_STRING": azure_connection_string, + "AZURE_STORAGE_CONTAINER": azure_container_name + } + st.success("API Keys saved successfully!") + st.rerun() + + # Collapsible section for How to Get API Keys + with st.sidebar.expander("How to Get API Keys", expanded=False): + st.subheader("API Key Setup Instructions") + st.markdown(""" + - **OpenAI** + - [Get your OpenAI API Key](https://platform.openai.com/account/api-keys) + - Set `OPENAI_API_KEY` in the sidebar. + - For `OPENAI_BASE_URL`, use `https://api.openai.com/v1` or leave it blank. + + - **Weaviate** + - [Access your Weaviate Cluster details](https://console.weaviate.cloud/cluster-details) + - Follow [this guide](https://weaviate.io/developers/wcs/create-instance) to create a new cluster if needed. + - Set `WEAVIATE_URL` with the REST endpoint and `WEAVIATE_API_KEY` with the admin key. + + - **AssemblyAI** + - [Create an AssemblyAI account](https://www.assemblyai.com/app) + - Copy your API key from the homepage and set it in `ASSEMBLY_AI_KEY`. + + - **Azure** + - [Create a storage account](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) + - Go to the Access Keys section in Azure and copy the connection string into `AZURE_STORAGE_CONNECTION_STRING`. + - Specify the container name in `AZURE_STORAGE_CONTAINER_NAME`. + """) + + # Navigation Buttons with Validation col1, col2 = st.columns([1, 1]) - with col1: - if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): - st.session_state.page = "upload" - with col2: - if st.button("View Documents", key="view", help="View the documents that have been uploaded"): - st.session_state.page = "view" + if are_api_keys_set(): + try: + client = get_weaviate_client() + blob_service_client, container_client = get_blob_service_clients() + + # Validate connections + if not client: + st.error("Failed to connect to Weaviate. Please check your API configuration.") + return + if not blob_service_client or not container_client: + st.error("Failed to connect to Azure Blob Storage. Please check your API configuration.") + return + + with col1: + if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): + st.session_state.page = "upload" + with col2: + if st.button("View Documents", key="view", help="View the documents that have been uploaded"): + st.session_state.page = "view" + + except Exception as e: + st.error(f"Error validating API keys: {e}") + else: + st.warning("API Keys must be configured to access other pages.") + with col1: + st.button("Upload Files", key="upload_disabled", disabled=True) + with col2: + st.button("View Documents", key="view_disabled", disabled=True) -# Upload Files Page def upload_files_page(): st.title("Upload Municipal Meeting Documents") - - # Sidebar for metadata and options selection - st.sidebar.header("Document Metadata & Transcription Options") - meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today()) - meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) - file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"]) - model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"]) - speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization") - - # Save metadata - if st.sidebar.button("Save Metadata"): - st.session_state["metadata"] = { - "meeting_date": meeting_date, - "meeting_type": meeting_type, - "file_type": file_type, - "model": model_option, - "speaker_labels": speaker_labels - } + # Sidebar Configuration + st.sidebar.header("Upload File Configuration") + + # Collapsible section for Document Metadata and Transcription Options + with st.sidebar.expander("Document Metadata & Transcription Options", expanded=True): + st.subheader("Document Metadata") + meeting_date = st.date_input("Select Meeting Date", datetime.today()) + meeting_type = st.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) + file_type = st.radio("File Type", ["Agenda", "Minutes", "Audio"]) + model_option = st.selectbox("Select Transcription Model", ["default", "best", "nano"]) + speaker_labels = st.checkbox("Enable Speaker Diarization") + + # Save metadata into session state + if st.button("Save Metadata", key="save_metadata"): + st.session_state["metadata"] = { + "meeting_date": meeting_date, + "meeting_type": meeting_type, + "file_type": file_type, + "model": model_option, + "speaker_labels": speaker_labels + } + st.success("Metadata saved successfully!") + + # Collapsible section to display Saved API Keys + with st.sidebar.expander("Saved API Keys", expanded=False): + st.subheader("API Keys in Use") + if "api_keys" in st.session_state: + api_keys = st.session_state["api_keys"] + st.markdown(f""" + - **OpenAI API Key**: {api_keys.get("OPENAI_API_KEY", "Not Set")} + - **OpenAI Base URL**: {api_keys.get("OPENAI_BASE_URL", "Not Set")} + - **Weaviate URL**: {api_keys.get("WEAVIATE_URL", "Not Set")} + - **Weaviate API Key**: {api_keys.get("WEAVIATE_API_KEY", "Not Set")} + - **AssemblyAI API Key**: {api_keys.get("ASSEMBLY_AI_KEY", "Not Set")} + - **Azure Connection String**: {api_keys.get("AZURE_STORAGE_CONNECTION_STRING", "Not Set")} + - **Azure Container Name**: {api_keys.get("AZURE_STORAGE_CONTAINER", "Not Set")} + """) + else: + st.warning("No API keys found. Please configure them on the Home Page.") + + # Initialize Azure Blob Storage and Weaviate clients + try: + blob_service_client, container_client = get_blob_service_clients() + weaviate_client = get_weaviate_client() + + if not blob_service_client or not container_client or not weaviate_client: + st.error("API key configurations are incomplete. Please configure all keys on the Home Page.") + return + + except Exception as e: + st.error(f"Error initializing clients: {e}") + return + + # Main Upload Section st.header("Upload New Document") file = st.file_uploader("Choose a file to upload", type=["pdf", "mp3", "wav"]) # Initialize progress bar progress_bar = st.progress(0) + # Handle file upload if file and "metadata" in st.session_state: metadata = st.session_state["metadata"] @@ -171,67 +340,71 @@ def upload_files_page(): file_extension = os.path.splitext(file.name)[1] raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}" - # Stage 1: Upload to Raw - upload_to_azure("raw", raw_file_name, file.read()) - st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}") - progress_bar.progress(20) - - # Stage 2: Process based on file type - if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: - with st.spinner(f"Transcribing audio using {metadata['model']} model..."): - transcribed_text = transcribe_audio( - raw_file_name=raw_file_name, - model=metadata["model"], - speaker_labels=metadata["speaker_labels"] + try: + # Upload the file to Azure Blob Storage + upload_to_azure("raw", raw_file_name, file.read()) + st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}") + progress_bar.progress(20) + + # Stage 2: Process based on file type + if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: + with st.spinner(f"Transcribing audio using {metadata['model']} model..."): + transcribed_text = transcribe_audio( + raw_file_name=raw_file_name, + model=metadata["model"], + speaker_labels=metadata["speaker_labels"] + ) + if transcribed_text: + dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt" + upload_to_azure("dirty", dirty_file_name, transcribed_text) + st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}") + st.text_area("Transcribed Audio Text:", transcribed_text, height=200) + st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name) + else: + st.error("Failed to transcribe the audio.") + + elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": + with st.spinner("Extracting text from PDF..."): + extracted_text = convert_pdf_to_text(raw_file_name) + if extracted_text: + dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt" + upload_to_azure("dirty", dirty_file_name, extracted_text) + st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}") + st.text_area("Extracted PDF Text:", extracted_text, height=200) + st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name) + else: + st.error("Failed to extract text from the PDF.") + + # Stage 3: Clean Text and Upload to Clean + dirty_content = download_from_azure("dirty", dirty_file_name) + with st.spinner("Cleaning text using generative AI..."): + cleaned_text = clean_text(dirty_file_name) # Pass the actual content + clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" + upload_to_azure("clean", clean_file_name, cleaned_text) + st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") + + # Stage 4: Check and Delete Existing Embeddings + with st.spinner("Checking for existing embeddings in Weaviate..."): + matching_chunks = fetch_matching_chunks( + str(metadata["meeting_date"]), + metadata["meeting_type"], + metadata["file_type"], + clean_file_name ) - if transcribed_text: - dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt" - upload_to_azure("dirty", dirty_file_name, transcribed_text) - st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}") - st.text_area("Transcribed Audio Text:", transcribed_text, height=200) - st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name) - else: - st.error("Failed to transcribe the audio.") - - elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": - with st.spinner("Extracting text from PDF..."): - extracted_text = convert_pdf_to_text(raw_file_name) - if extracted_text: - dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt" - upload_to_azure("dirty", dirty_file_name, extracted_text) - st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}") - st.text_area("Extracted PDF Text:", extracted_text, height=200) - st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name) - else: - st.error("Failed to extract text from the PDF.") - - # Stage 3: Clean Text and Upload to Clean - dirty_content = download_from_azure("dirty", dirty_file_name) - with st.spinner("Cleaning text using generative AI..."): - cleaned_text = clean_text(dirty_file_name) - clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" - upload_to_azure("clean", clean_file_name, cleaned_text) - st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") - - # Stage 4: Check and Delete Existing Embeddings - with st.spinner("Checking for existing embeddings..."): - matching_chunks = fetch_matching_chunks( - str(metadata["meeting_date"]), - metadata["meeting_type"], - metadata["file_type"], - clean_file_name - ) - if matching_chunks: - st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...") - delete_matching_chunks(matching_chunks) - else: - st.write("No existing chunks found.") - - # Stage 5: Chunk and Embed into Weaviate - with st.spinner("Chunking and embedding text into Weaviate..."): - tokenize_and_embed_text(clean_file_name, metadata) - st.success("Document processed and embedded successfully!") - progress_bar.progress(100) + if matching_chunks: + st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...") + delete_matching_chunks(matching_chunks) + else: + st.write("No existing chunks found.") + + # Stage 5: Chunk and Embed into Weaviate + with st.spinner("Chunking and embedding text into Weaviate..."): + tokenize_and_embed_text(clean_file_name, metadata) + st.success("Document processed and embedded successfully!") + progress_bar.progress(100) + + except Exception as e: + st.error(f"Error processing file: {e}") # Navigation buttons col1, col2 = st.columns([1, 1]) @@ -242,58 +415,70 @@ def upload_files_page(): if st.button("View Documents"): st.session_state.page = "view" -# View Documents Page + def view_documents_page(): - st.title("Uploaded Documents") + st.title("View Uploaded Files") + + # Sidebar Configuration + st.sidebar.header("View Documents Configuration") + + # Collapsible section to display Saved API Keys + with st.sidebar.expander("Saved API Keys", expanded=False): + st.subheader("API Keys in Use") + if "api_keys" in st.session_state: + api_keys = st.session_state["api_keys"] + st.markdown(f""" + - **OpenAI API Key**: {api_keys.get("OPENAI_API_KEY", "Not Set")} + - **OpenAI Base URL**: {api_keys.get("OPENAI_BASE_URL", "Not Set")} + - **Weaviate URL**: {api_keys.get("WEAVIATE_URL", "Not Set")} + - **Weaviate API Key**: {api_keys.get("WEAVIATE_API_KEY", "Not Set")} + - **AssemblyAI API Key**: {api_keys.get("ASSEMBLY_AI_KEY", "Not Set")} + - **Azure Connection String**: {api_keys.get("AZURE_STORAGE_CONNECTION_STRING", "Not Set")} + - **Azure Container Name**: {api_keys.get("AZURE_STORAGE_CONTAINER", "Not Set")} + """) + else: + st.warning("No API keys found. Please configure them on the Home Page.") + + # Fetch files and group them by folder and date try: - # Fetch blobs from each folder - raw_blobs = list_blobs_in_folder("raw") - dirty_blobs = list_blobs_in_folder("dirty") - clean_blobs = list_blobs_in_folder("clean") - - def group_blobs_by_date(blobs): - """Groups blobs by their date extracted from the file name.""" - grouped = {} - for blob in blobs: - try: - file_name = blob.split("/")[-1] # Extract the file name - parts = file_name.split("_") # Split into parts: ['2023', '12', '12', 'BOC', 'Agenda', ...] - date_str = "_".join(parts[:3]) # Join the first three parts: '2023_12_12' - readable_date = datetime.strptime(date_str, "%Y_%m_%d").strftime("%B %d, %Y") - if readable_date not in grouped: - grouped[readable_date] = [] - grouped[readable_date].append(blob) - except (ValueError, IndexError): - if "Unknown Date" not in grouped: - grouped["Unknown Date"] = [] - grouped["Unknown Date"].append(blob) - return grouped - - raw_grouped = group_blobs_by_date(raw_blobs) - dirty_grouped = group_blobs_by_date(dirty_blobs) - clean_grouped = group_blobs_by_date(clean_blobs) - - def display_grouped_blobs(grouped_blobs, category): - if grouped_blobs: - st.subheader(f"{category.capitalize()} Documents") - for date, blobs in grouped_blobs.items(): - with st.expander(f"Date: {date}", expanded=False): - for blob in blobs: - st.write(f"- {blob}") - if st.button(f"Download {blob}", key=f"download_{category}_{blob}"): - file_content = download_from_azure(category, blob) - st.download_button("Download", data=file_content, file_name=blob) - else: - st.info(f"No documents found in the {category} category.") - - display_grouped_blobs(raw_grouped, "raw") - display_grouped_blobs(dirty_grouped, "dirty") - display_grouped_blobs(clean_grouped, "clean") + raw_files = list_blobs_in_folder("raw") + dirty_files = list_blobs_in_folder("dirty") + clean_files = list_blobs_in_folder("clean") + + def display_grouped_files(folder_name, grouped_files): + """ + Display grouped files by date for a specific folder. + + Args: + folder_name (str): The name of the folder (raw, dirty, clean). + grouped_files (dict): Dictionary of grouped files by date. + """ + with st.expander(f"{folder_name.capitalize()} Files", expanded=False): + for date, files in grouped_files.items(): + st.markdown(f"**Date: {date}**") + for file_path in files: + file_name = file_path.split("/")[-1] + if st.button(f"Download {file_name}", key=f"{folder_name}_{file_name}_button"): + try: + file_content = download_from_azure(folder_name, file_name) + st.download_button( + label=f"Download {file_name}", + data=file_content, + file_name=file_name, + key=f"download_{folder_name}_{file_name}" + ) + except Exception as e: + st.error(f"Error downloading {file_name}: {e}") + + # Display files for each folder + display_grouped_files("clean", clean_files) + display_grouped_files("dirty", dirty_files) + display_grouped_files("raw", raw_files) except Exception as e: - st.error(f"Error fetching documents from Azure Blob Storage: {e}") + st.error(f"Error fetching files from Azure Blob Storage: {e}") - # Navigation buttons + # Navigation Buttons col1, col2 = st.columns([1, 1]) with col1: if st.button("Return Home"): diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt index e90a5e3e..02da6eae 100644 --- a/Preprocessing/docker/requirements.txt +++ b/Preprocessing/docker/requirements.txt @@ -23,4 +23,5 @@ azure.storage.blob transformers chardet pytest -easyocr \ No newline at end of file +easyocr +tiktoken \ No newline at end of file diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py index 9d836030..6fb3c0de 100644 --- a/Preprocessing/preprocessing_pipeline/audio_transcription.py +++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py @@ -1,11 +1,15 @@ -import os import requests +import streamlit as st from utils.azure_blob_utils import download_from_azure -from utils.env_setup import load_env -# Load environment variables -load_env() -ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY") +# Dynamically fetch AssemblyAI API key from Streamlit session state +def get_assembly_ai_key(): + api_keys = st.session_state.get("api_keys", {}) + assembly_ai_key = api_keys.get("ASSEMBLY_AI_KEY") + if not assembly_ai_key: + raise ValueError("AssemblyAI API key is missing. Please configure it in the Streamlit app.") + return assembly_ai_key + ASSEMBLY_AI_ENDPOINT = "https://api.assemblyai.com/v2" def transcribe_audio(raw_file_name, model=None, speaker_labels=False): @@ -20,8 +24,11 @@ def transcribe_audio(raw_file_name, model=None, speaker_labels=False): Returns: - str: Transcribed text, or None if transcription fails. """ - headers = {"authorization": ASSEMBLY_AI_KEY} try: + # Fetch the AssemblyAI key dynamically + assembly_ai_key = get_assembly_ai_key() + headers = {"authorization": assembly_ai_key} + # Step 1: Download the raw audio file from Azure raw_content = download_from_azure("raw", raw_file_name, as_text=False) print(f"Downloaded {raw_file_name} from Azure for transcription.") @@ -82,3 +89,4 @@ def transcribe_audio(raw_file_name, model=None, speaker_labels=False): except Exception as e: print(f"Error during transcription: {e}") return None + diff --git a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py index 380d96cd..37107c9a 100644 --- a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py +++ b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py @@ -1,29 +1,35 @@ -import os -from openai import OpenAI +import streamlit as st +import requests import weaviate import tiktoken # Use tiktoken for OpenAI-compatible tokenization -from utils.env_setup import load_env from utils.azure_blob_utils import download_from_azure -# Load environment variables -load_env() -WEAVIATE_URL = os.getenv("WEAVIATE_URL") -WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +# Dynamic API Key Retrieval +def get_weaviate_client(): + api_keys = st.session_state.get("api_keys", {}) + weaviate_url = api_keys.get("WEAVIATE_URL") + weaviate_api_key = api_keys.get("WEAVIATE_API_KEY") -# Initialize Weaviate client -client = weaviate.Client( - url=WEAVIATE_URL, - auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) + if not weaviate_url or not weaviate_api_key: + raise ValueError("Weaviate API configuration is missing. Please configure it in the Streamlit app.") + + return weaviate.Client( + url=weaviate_url, + auth_client_secret=weaviate.AuthApiKey(api_key=weaviate_api_key) ) -# Initialize OpenAI client for embedding -openai_client = OpenAI(api_key=OPENAI_API_KEY) +def get_openai_api_key(): + api_keys = st.session_state.get("api_keys", {}) + openai_api_key = api_keys.get("OPENAI_API_KEY") + + if not openai_api_key: + raise ValueError("OpenAI API key is missing. Please configure it in the Streamlit app.") + + return openai_api_key # Initialize tiktoken for OpenAI's embedding model tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002") - def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document): """ Fetch matching chunks from Weaviate based on metadata. @@ -37,6 +43,7 @@ def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document Returns: list: A list of matching documents. """ + client = get_weaviate_client() query = f""" {{ Get {{ @@ -67,6 +74,7 @@ def delete_matching_chunks(documents): Args: documents (list): List of documents with IDs to delete. """ + client = get_weaviate_client() for doc in documents: doc_id = doc["_additional"]["id"] client.data_object.delete(doc_id) @@ -83,6 +91,10 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250): max_chunk_size (int): Maximum token size for each chunk. """ try: + # Initialize clients dynamically + client = get_weaviate_client() + openai_api_key = get_openai_api_key() + # Download cleaned text from Azure clean_text = download_from_azure("clean", clean_file_name) tokens = tokenizer.encode(clean_text) @@ -107,8 +119,17 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250): # Embed and upload each chunk for i, chunk in enumerate(chunks): - response = openai_client.embeddings.create(input=chunk, model="text-embedding-ada-002") - embedding = response.data[0].embedding + # Request embedding from OpenAI + headers = {"Authorization": f"Bearer {openai_api_key}"} + response = requests.post( + "https://api.openai.com/v1/embeddings", + headers=headers, + json={"input": chunk, "model": "text-embedding-ada-002"} + ) + if response.status_code != 200: + raise ValueError(f"OpenAI embedding error: {response.status_code} - {response.text}") + + embedding = response.json()["data"][0]["embedding"] client.data_object.create( data_object={ diff --git a/Preprocessing/preprocessing_pipeline/pdf_conversion.py b/Preprocessing/preprocessing_pipeline/pdf_conversion.py index 7b3c7499..d9c7e77c 100644 --- a/Preprocessing/preprocessing_pipeline/pdf_conversion.py +++ b/Preprocessing/preprocessing_pipeline/pdf_conversion.py @@ -3,8 +3,10 @@ from PIL import Image from io import BytesIO import numpy as np +import streamlit as st from utils.azure_blob_utils import download_from_azure + def convert_pdf_to_text(raw_file_name): """ Extracts text from a PDF file. Uses EasyOCR as a fallback for scanned PDFs. @@ -17,33 +19,34 @@ def convert_pdf_to_text(raw_file_name): """ try: # Step 1: Download the raw file from Azure Blob Storage + print(f"Downloading {raw_file_name} from Azure Blob Storage (raw folder)...") raw_content = download_from_azure("raw", raw_file_name, as_text=False) - # Step 2: Open the PDF content + # Step 2: Open the PDF content using PyMuPDF (fitz) pdf_document = fitz.open(stream=raw_content, filetype="pdf") - text = "" - reader = easyocr.Reader(['en']) # Initialize EasyOCR for English + text = "" # Initialize a string to hold extracted text + reader = easyocr.Reader(['en'], gpu=False) # Initialize EasyOCR for English (disable GPU for portability) for page_num in range(pdf_document.page_count): page = pdf_document[page_num] - # Attempt to extract text directly + # Attempt to extract text directly from the page page_text = page.get_text() - if page_text.strip(): # If direct text is available - print(f"Text extracted directly from page {page_num + 1}.") + if page_text.strip(): # If direct text extraction is successful + print(f"Direct text extracted from page {page_num + 1}.") text += page_text else: # Fallback to OCR for scanned pages - print(f"Applying OCR on page {page_num + 1} of {raw_file_name}.") - pix = page.get_pixmap(dpi=300) # Render page to an image - img = Image.open(BytesIO(pix.tobytes("png"))) + print(f"Direct text extraction failed on page {page_num + 1}. Applying OCR.") + pix = page.get_pixmap(dpi=300) # Render the page as a high-resolution image + img = Image.open(BytesIO(pix.tobytes("png"))) # Convert rendered image to a PIL Image img_array = np.array(img) # Convert PIL Image to NumPy array for EasyOCR - ocr_text = reader.readtext(img_array, detail=0) # Extract text with EasyOCR - text += "\n".join(ocr_text) + ocr_text = reader.readtext(img_array, detail=0) # Perform OCR with EasyOCR + text += "\n".join(ocr_text) # Append the OCR results to the text string - pdf_document.close() + pdf_document.close() # Close the PDF document print(f"Successfully extracted text from {raw_file_name}.") return text except Exception as e: - print(f"Error in OCR for {raw_file_name}: {e}") + print(f"Error processing PDF {raw_file_name}: {e}") return None diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py index a9912220..d4f3f3b5 100644 --- a/Preprocessing/preprocessing_pipeline/text_cleaning.py +++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py @@ -1,16 +1,28 @@ -import os +import streamlit as st +import tiktoken # For OpenAI-compatible tokenization from openai import OpenAI -import tiktoken # Use tiktoken for OpenAI-compatible tokenization -from utils.env_setup import load_env -from utils.azure_blob_utils import download_from_azure, upload_to_azure - -# Load environment variables -load_env() -client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) +from utils.azure_blob_utils import download_from_azure # Initialize tiktoken for OpenAI's GPT models tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo") # Specify the OpenAI model + +def get_openai_client(): + """ + Retrieves the OpenAI client using the API key from Streamlit session state. + + Returns: + OpenAI: OpenAI client object. + """ + api_keys = st.session_state.get("api_keys", {}) + openai_api_key = api_keys.get("OPENAI_API_KEY") + + if not openai_api_key: + raise ValueError("OpenAI API Key is missing. Please configure it on the Home Page.") + + return OpenAI(api_key=openai_api_key) + + def tokenize_and_split_text(text, max_chunk_size=250): """ Tokenizes and splits text into smaller chunks within the token size limit. @@ -22,6 +34,10 @@ def tokenize_and_split_text(text, max_chunk_size=250): Returns: list of str: List of smaller text chunks. """ + # Validate text input + if not text or text.strip() == "": + raise ValueError("Text input is empty or invalid.") + # Tokenize the text into tokens tokens = tokenizer.encode(text) @@ -32,12 +48,14 @@ def tokenize_and_split_text(text, max_chunk_size=250): ] return chunks -def clean_text_chunk(chunk): + +def clean_text_chunk(chunk, openai_client): """ Cleans a single chunk of text using OpenAI GPT. Args: chunk (str): Text chunk to clean. + openai_client (OpenAI): OpenAI client instance. Returns: str: Cleaned text. @@ -51,13 +69,18 @@ def clean_text_chunk(chunk): {"role": "user", "content": f"Clean the following text for readability: {chunk}"} ] - response = client.chat.completions.create( - model="gpt-3.5-turbo", - messages=messages, - max_tokens=2000, - temperature=0.5 - ) - return response.choices[0].message.content.strip() + try: + response = openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=messages, + max_tokens=2000, + temperature=0.5 + ) + return response.choices[0].message.content.strip() + except Exception as e: + print(f"Error during chunk cleaning: {e}") + return f"Error in chunk cleaning: {e}" + def clean_text(dirty_file_name): """ @@ -69,16 +92,40 @@ def clean_text(dirty_file_name): Returns: str: Combined cleaned text. """ - print(f"Downloading {dirty_file_name} from Azure Blob Storage...") - dirty_content = download_from_azure("dirty", dirty_file_name) - - # Tokenize and split the text into chunks of 250 tokens - chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250) - cleaned_chunks = [] - - for i, chunk in enumerate(chunks): - print(f"Cleaning chunk {i + 1}/{len(chunks)}...") - cleaned_chunk = clean_text_chunk(chunk) - cleaned_chunks.append(cleaned_chunk) - - return "\n\n".join(cleaned_chunks) + try: + print(f"Downloading {dirty_file_name} from Azure Blob Storage (dirty folder)...") + dirty_content = download_from_azure("dirty", dirty_file_name) + + # Validate dirty content + if not dirty_content or dirty_content.strip() == "": + raise ValueError("The downloaded content is empty. Please check the file content.") + + # Initialize OpenAI client dynamically + openai_client = get_openai_client() + + # Tokenize and split the text into chunks + print("Tokenizing and splitting text into manageable chunks...") + chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250) + cleaned_chunks = [] + + for i, chunk in enumerate(chunks): + print(f"Cleaning chunk {i + 1}/{len(chunks)}: {chunk[:100]}...") + try: + cleaned_chunk = clean_text_chunk(chunk, openai_client) + except Exception as e: + print(f"Error cleaning chunk {i + 1}: {e}") + cleaned_chunk = f"Error cleaning this chunk: {e}" + + if not cleaned_chunk.strip(): + print(f"Chunk {i + 1} returned empty after cleaning.") + raise ValueError(f"Chunk {i + 1} cleaning failed. Received empty content.") + + cleaned_chunks.append(cleaned_chunk) + + print(f"Successfully cleaned {len(chunks)} chunks.") + return "\n\n".join(cleaned_chunks) + + except Exception as e: + print(f"Error during text cleaning: {e}") + return None + diff --git a/Preprocessing/utils/azure_blob_utils.py b/Preprocessing/utils/azure_blob_utils.py index 34dd3569..fea990aa 100644 --- a/Preprocessing/utils/azure_blob_utils.py +++ b/Preprocessing/utils/azure_blob_utils.py @@ -1,64 +1,125 @@ from azure.storage.blob import BlobServiceClient -import os -from dotenv import load_dotenv import chardet -load_dotenv() # Load environment variables from .env file +import streamlit as st -# Set up the blob service client -connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING") -container_name = os.getenv("AZURE_STORAGE_CONTAINER") -blob_service_client = BlobServiceClient.from_connection_string(connection_string) -container_client = blob_service_client.get_container_client(container_name) +def get_blob_service_clients(): + """ + Initializes the Azure Blob Service Client and Container Client dynamically from `st.session_state`. + + Returns: + tuple: (BlobServiceClient, ContainerClient) + """ + try: + api_keys = st.session_state.get("api_keys", {}) + connection_string = api_keys.get("AZURE_STORAGE_CONNECTION_STRING") + container_name = api_keys.get("AZURE_STORAGE_CONTAINER") + + if not connection_string: + raise ValueError("Azure Storage Connection String is missing. Please set it on the Home Page.") + if not container_name: + raise ValueError("Azure Storage Container Name is missing. Please set it on the Home Page.") + + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + container_client = blob_service_client.get_container_client(container_name) + return blob_service_client, container_client + except Exception as e: + print(f"Error initializing Azure Blob Service or Container Client: {e}") + raise e + +def list_blobs_in_folder(folder_name): + """ + List all blobs in a specific folder in Azure Blob Storage. + + Args: + folder_name (str): The folder to list blobs from. + + Returns: + dict: Dictionary where keys are dates, and values are lists of blob names for that date. + """ + try: + _, container_client = get_blob_service_clients() + blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/") + grouped_blobs = {} + + for blob in blobs: + file_name = blob.name.split("/")[-1] # Extract the file name + if not file_name: # Skip empty folder paths + continue + parts = file_name.split("_")[:3] # Extract the date (e.g., 2023_11_14) + if len(parts) == 3: + date_key = "_".join(parts) # Format: YYYY_MM_DD + else: + date_key = "Unknown Date" + grouped_blobs.setdefault(date_key, []).append(blob.name) + + return grouped_blobs + except Exception as e: + print(f"Error listing blobs in folder {folder_name}: {e}") + raise e def upload_to_azure(folder_name, file_name, file_content): """ - Upload a file to Azure Blob Storage. + Uploads a file to a specified folder in Azure Blob Storage. Args: folder_name (str): The folder in the Azure container (e.g., raw, dirty, clean). file_name (str): The name of the file to upload. file_content (bytes): The binary content of the file to upload. + + Returns: + str: Success message with the uploaded file path. """ - blob_name = f"{folder_name}/{file_name}" - blob_client = container_client.get_blob_client(blob_name) - blob_client.upload_blob(file_content, overwrite=True) - print(f"Uploaded to Azure: {blob_name}") + try: + # Validate inputs + if not folder_name or not file_name: + raise ValueError("Folder name and file name cannot be empty.") + if not file_content: + raise ValueError("File content is empty or None.") + + # Initialize Azure Blob Service clients + _, container_client = get_blob_service_clients() + + # Construct the blob path + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) + + # Upload the file, overwriting if it already exists + blob_client.upload_blob(file_content, overwrite=True) + print(f"Successfully uploaded {file_name} to Azure at {blob_name}.") + return f"File successfully uploaded to: {blob_name}" + except Exception as e: + print(f"Error uploading {file_name} to Azure: {e}") + raise Exception(f"Failed to upload file {file_name}: {e}") + def download_from_azure(folder_name, file_name, as_text=True): """ - Download a file from Azure Blob Storage with streaming. - """ - blob_name = f"{folder_name}/{file_name}" - blob_client = container_client.get_blob_client(blob_name) + Download a file from Azure Blob Storage. - # Print the URL for debugging - print(f"Generated Blob URL: {blob_client.url}") + Args: + folder_name (str): The folder in the Azure container (e.g., clean, dirty, raw). + file_name (str): The name of the file to download. + as_text (bool): Whether to decode the file content as text or return binary content. + Returns: + str or bytes: The content of the file as text or binary. + """ try: + _, container_client = get_blob_service_clients() + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) downloader = blob_client.download_blob(max_concurrency=5) + if as_text: - # Read as binary first and detect encoding + # Read as binary first and detect encoding for text decoding raw_data = downloader.readall() detected_encoding = chardet.detect(raw_data)['encoding'] print(f"Detected encoding: {detected_encoding}") return raw_data.decode(detected_encoding) # Decode using detected encoding else: - print(f"Downloading {blob_name} as binary.") - return downloader.readall() # Return binary content + return downloader.readall() # Return binary content if `as_text` is False + except Exception as e: print(f"Error downloading blob {blob_name}: {e}") raise e - -def list_blobs_in_folder(folder_name): - """ - List all blobs in a specific folder in Azure Blob Storage. - - Args: - folder_name (str): The folder to list blobs from. - - Returns: - list: List of blob names. - """ - blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/") - return [blob.name for blob in blobs] diff --git a/Preprocessing/utils/env_setup.py b/Preprocessing/utils/env_setup.py deleted file mode 100644 index acc1b513..00000000 --- a/Preprocessing/utils/env_setup.py +++ /dev/null @@ -1,12 +0,0 @@ -import os -import sys -from dotenv import load_dotenv - -def load_env(): - """ - Loads environment variables from a .env file and adds PYTHONPATH. - """ - load_dotenv() - python_path = os.getenv("PYTHONPATH") - if python_path: - sys.path.append(python_path) diff --git a/Preprocessing/utils/file_utils.py b/Preprocessing/utils/file_utils.py deleted file mode 100644 index e69de29b..00000000 diff --git a/Preprocessing/utils/metadata_utils.py b/Preprocessing/utils/metadata_utils.py deleted file mode 100644 index e69de29b..00000000