From 3555cd1b3fdb4153d07e6f1eab0261942e661bf3 Mon Sep 17 00:00:00 2001 From: Riley LePrell <150089004+RileyLePrell@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:40:49 -0500 Subject: [PATCH 1/5] Staff Facing App + Preprocessing Pipeline - Preliminary outline of the Staff Facing App; it includes three pages: 1. Home Page - Intro to website. 2. Upload Files - Where staff can upload audio recording, minutes, or agendas. 3. View Documents - Allows to view whats currently in the cloud. I've created a preprocessing pipeline as well. 1. Users input metadata before uploading file 2. If audio file is transcribed by Assembly-AI then converted into a dirty text file. If minute/agenda converts pdf to dirty text file. 3. Uses OpenAI + Prompt Engineering to clean the text files 4. Chunks Tokens Vectors. 5. Sits in Weviate Cloud --- Preprocessing/{App => }/.env.example | 0 Preprocessing/App/main.py | 307 ++++++++++++++++++ Preprocessing/App/requirements.txt | 11 - Preprocessing/Dockerfile | 6 - Preprocessing/docker/Dockerfile | 20 ++ Preprocessing/docker/requirements.txt | 20 ++ .../preprocessing_pipeline/__init__.py | 0 .../audio_transcription.py | 55 ++++ .../chunking_tokenization.py | 14 + .../preprocessing_pipeline/pdf_conversion.py | 10 + .../preprocessing_pipeline/text_cleaning.py | 35 ++ .../vector_embedding.py | 33 ++ Preprocessing/utils/env_setup.py | 12 + Preprocessing/utils/file_utils.py | 0 Preprocessing/utils/metadata_utils.py | 0 15 files changed, 506 insertions(+), 17 deletions(-) rename Preprocessing/{App => }/.env.example (100%) create mode 100644 Preprocessing/App/main.py delete mode 100644 Preprocessing/App/requirements.txt delete mode 100644 Preprocessing/Dockerfile create mode 100644 Preprocessing/docker/Dockerfile create mode 100644 Preprocessing/docker/requirements.txt create mode 100644 Preprocessing/preprocessing_pipeline/__init__.py create mode 100644 Preprocessing/preprocessing_pipeline/audio_transcription.py create mode 100644 Preprocessing/preprocessing_pipeline/chunking_tokenization.py create mode 100644 Preprocessing/preprocessing_pipeline/pdf_conversion.py create mode 100644 Preprocessing/preprocessing_pipeline/text_cleaning.py create mode 100644 Preprocessing/preprocessing_pipeline/vector_embedding.py create mode 100644 Preprocessing/utils/env_setup.py create mode 100644 Preprocessing/utils/file_utils.py create mode 100644 Preprocessing/utils/metadata_utils.py diff --git a/Preprocessing/App/.env.example b/Preprocessing/.env.example similarity index 100% rename from Preprocessing/App/.env.example rename to Preprocessing/.env.example diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py new file mode 100644 index 00000000..3396a92e --- /dev/null +++ b/Preprocessing/App/main.py @@ -0,0 +1,307 @@ +import os +import sys +from dotenv import load_dotenv +from datetime import datetime + +# Load environment variables from .env +load_dotenv() + +# Set PYTHONPATH from .env if available +python_path = os.getenv("PYTHONPATH") +if python_path: + sys.path.append(python_path) + + +import streamlit as st +import weaviate # Import Weaviate client +from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text +from preprocessing_pipeline.audio_transcription import transcribe_audio +from preprocessing_pipeline.text_cleaning import clean_text +from preprocessing_pipeline.chunking_tokenization import process_text_chunks +from preprocessing_pipeline.vector_embedding import embed_text + +# Set up Weaviate client +client = weaviate.Client( + url=os.getenv("WEAVIATE_URL"), + auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY")) +) + +# Fetch documents from Weaviate +def fetch_uploaded_documents(): + # Query Weaviate for documents + query = """ + { + Get { + Documents { + file_name + file_type + meeting_date + meeting_type + clean_text + chunks + } + } + } + """ + response = client.query.raw(query) + documents = response.get("data", {}).get("Get", {}).get("Documents", []) + return documents + +# Define pages +def home_page(): + # Apply custom styling with IBM Plex Mono + st.markdown(f""" + + """, unsafe_allow_html=True) + + st.markdown(f""" +
+

Minute Mate

+

+ Welcome to Minute Mate; this is a staff-level application to upload meeting audios, minutes, and agendas to provide further context to the front end. +

+
+ """, unsafe_allow_html=True) + + # Navigation buttons (centered) + col1, col2 = st.columns([1, 1]) + + with col1: + if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): + st.session_state.page = "upload" + + with col2: + if st.button("View Documents", key="view", help="View the documents that have been uploaded"): + st.session_state.page = "view" + +def upload_files_page(): + st.title("Upload Municipal Meeting Documents") + + # Sidebar for metadata and options selection + st.sidebar.header("Document Metadata & Transcription Options") + + # Metadata Input Fields + meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today()) + meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) + file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"]) + + # Transcription Model and Language Options + model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"]) + speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization") + + # Save Metadata Button + if st.sidebar.button("Save Metadata"): + st.session_state["metadata"] = { + "meeting_date": meeting_date, + "meeting_type": meeting_type, + "file_type": file_type, + "model": model_option, + "speaker_labels": speaker_labels + } + + st.header("Upload New Document") + file = st.file_uploader("Choose a file to upload", type=["pdf", "mp3", "wav"]) + + # Initialize progress bar + progress_bar = st.progress(0) + + if file and "metadata" in st.session_state: + metadata = st.session_state["metadata"] + progress_bar.progress(10) + st.write("Stage: Metadata Saved") + + if metadata["file_type"] in ["Agenda", "Minutes"] and file.type == "application/pdf": + # Stage: PDF to Dirty Text Conversion + with st.spinner("Converting PDF to text..."): + dirty_text = convert_pdf_to_text(file) + + pdf_text_path = "pdf_text_output.txt" + with open(pdf_text_path, "w") as f: + f.write(dirty_text) + + progress_bar.progress(30) + st.write("Stage: PDF Conversion Complete") + + # Display and download PDF text conversion + st.text_area("Converted PDF Text:", dirty_text, height=200) + st.download_button("Download PDF Text", data=dirty_text, file_name=pdf_text_path) + + elif metadata["file_type"] == "Audio" and file.type in ["audio/mpeg", "audio/wav"]: + # Stage: Audio Transcription with selected model and speaker labels + with st.spinner(f"Transcribing audio using {metadata['model']} model..."): + dirty_text = transcribe_audio(file, model=metadata["model"], speaker_labels=metadata["speaker_labels"]) + + transcription_path = "transcription_output.txt" + with open(transcription_path, "w") as f: + f.write(dirty_text) + + progress_bar.progress(30) + st.write("Stage: Audio Transcription Complete") + + # Display and download transcription + st.text_area("Audio Transcription:", dirty_text, height=200) + st.download_button("Download Transcription", data=dirty_text, file_name=transcription_path) + + # Continue processing if dirty_text was successfully created + if dirty_text: + # Stage: Text Cleaning + with st.spinner("Cleaning text with generative AI..."): + partly_clean_text = clean_text(dirty_text) + + cleaned_text_path = "cleaned_text_output.txt" + with open(cleaned_text_path, "w") as f: + f.write(partly_clean_text) + + progress_bar.progress(60) + st.write("Stage: Text Cleaning Complete") + + # Display and download cleaned text + st.text_area("Cleaned Text:", partly_clean_text, height=200) + st.download_button("Download Cleaned Text", data=partly_clean_text, file_name=cleaned_text_path) + + # Stage: Chunking and Tokenization + with st.spinner("Chunking and tokenizing text..."): + text_chunks = process_text_chunks(partly_clean_text) + progress_bar.progress(80) + st.write("Stage: Chunking and Tokenization Complete") + + # Stage: Embedding and Storage + with st.spinner("Embedding and storing in Weaviate..."): + embed_text(text_chunks, metadata) + progress_bar.progress(100) + st.write("Stage: Embedding and Storage Complete") + + st.success("Document processed and embedded with metadata!") + else: + st.error("Failed to process the document.") + + # Navigation buttons (centered) + col1, col2 = st.columns([1, 1]) + with col1: + if st.button("Return Home"): + st.session_state.page = "home" + with col2: + if st.button("View Documents"): + st.session_state.page = "view" + +def view_documents_page(): + st.title("Uploaded Documents") + + # Retrieve Weaviate URL and API Key from environment variables + weaviate_url = os.getenv("WEAVIATE_URL") + weaviate_api_key = os.getenv("WEAVIATE_API_KEY") + + if not weaviate_url or not weaviate_api_key: + st.error("Weaviate connection details (URL or API Key) are missing.") + return + + # Initialize Weaviate client with API key for authentication + client = weaviate.Client( + url=weaviate_url, + auth_client_secret=weaviate_api_key + ) + + # Fetch all objects from Weaviate + try: + # Get all objects from the collection (assuming "Documents" is the name of your collection) + result = client.data_object.get(class_name="Documents", properties=["file_name", "file_type", "meeting_date", "meeting_type", "clean_text", "chunks"]) + + if result['objects']: + for item in result['objects']: + file_name = item['properties'].get('file_name', 'N/A') + file_type = item['properties'].get('file_type', 'N/A') + meeting_date = item['properties'].get('meeting_date', 'N/A') + meeting_type = item['properties'].get('meeting_type', 'N/A') + clean_text = item['properties'].get('clean_text', 'No clean text available') + chunks = item['properties'].get('chunks', 'No chunks available') + + # Display the document details in Streamlit + st.subheader(f"Document: {file_name}") + st.write(f"**File Type:** {file_type}") + st.write(f"**Meeting Date:** {meeting_date}") + st.write(f"**Meeting Type:** {meeting_type}") + st.write(f"**Clean Text:** {clean_text[:300]}...") # Show a preview of the clean text + st.write(f"**Chunks:** {chunks[:300]}...") # Show a preview of the chunks + st.write("---") + else: + st.write("No documents found in the Weaviate database.") + except Exception as e: + st.error(f"Error fetching documents from Weaviate: {e}") + + # Navigation buttons (centered) + col1, col2 = st.columns([1, 1]) + with col1: + if st.button("Return Home"): + st.session_state.page = "home" + with col2: + if st.button("Upload Files"): + st.session_state.page = "upload" + # Navigation buttons (centered) + col1, col2 = st.columns([1, 1]) + with col1: + if st.button("Return Home"): + st.session_state.page = "home" + with col2: + if st.button("Upload Files"): + st.session_state.page = "upload" + +# Main page selection +if "page" not in st.session_state: + st.session_state.page = "home" + +if st.session_state.page == "home": + home_page() +elif st.session_state.page == "upload": + upload_files_page() +elif st.session_state.page == "view": + view_documents_page() diff --git a/Preprocessing/App/requirements.txt b/Preprocessing/App/requirements.txt deleted file mode 100644 index 00de1808..00000000 --- a/Preprocessing/App/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -# Environmental variables -python-dotenv==1.0.0 - -# Audio transcription -assemblyai==0.35.1 - -# Vector database -weaviate-client==4.7.1 - -# Embedding and generation services -openai==1.54.3 \ No newline at end of file diff --git a/Preprocessing/Dockerfile b/Preprocessing/Dockerfile deleted file mode 100644 index 31e6b89b..00000000 --- a/Preprocessing/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM python:3.11-slim -WORKDIR /App -COPY . /App -RUN python -m pip install -r requirements.txt -EXPOSE 8000 -CMD [" ", "start","--port","8001","--host","0.0.0.0"] \ No newline at end of file diff --git a/Preprocessing/docker/Dockerfile b/Preprocessing/docker/Dockerfile new file mode 100644 index 00000000..83035bc5 --- /dev/null +++ b/Preprocessing/docker/Dockerfile @@ -0,0 +1,20 @@ +# Dockerfile to set up the environment for Streamlit app + +# Use Python base image +FROM python:3.9-slim + +# Set working directory +WORKDIR /app + +# Copy requirements file and install dependencies +COPY docker/requirements.txt . +RUN pip install -r requirements.txt + +# Copy the application files +COPY . . + +# Expose port for Streamlit +EXPOSE 8501 + +# Run the Streamlit application +CMD ["streamlit", "run", "app/main.py", "--server.port=8501", "--server.address=0.0.0.0"] diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt new file mode 100644 index 00000000..4e43ca41 --- /dev/null +++ b/Preprocessing/docker/requirements.txt @@ -0,0 +1,20 @@ +# Environment variables management +python-dotenv==1.0.0 + +# Audio transcription +assemblyai==0.35.1 + +# Vector database (Weaviate client) +weaviate-client==4.7.1 + +# Embedding and generation services +openai==1.54.3 + +# Streamlit for web UI +streamlit==1.16.0 + +# PDF handling (for PDF to text conversion) +PyMuPDF==1.18.19 + +# OpenAI API client (if you're using GPT models for text cleaning) +openai==1.54.3 diff --git a/Preprocessing/preprocessing_pipeline/__init__.py b/Preprocessing/preprocessing_pipeline/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py new file mode 100644 index 00000000..b98f64c4 --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py @@ -0,0 +1,55 @@ +import assemblyai as aai +import os +from utils.env_setup import load_env + +# Load environment variables +load_env() +ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY") +aai.settings.api_key = ASSEMBLY_AI_KEY +transcriber = aai.Transcriber() + +def transcribe_audio(file, model="best", speaker_labels=False): + """ + Transcribes an audio file using AssemblyAI with specified model and speaker labels option. + + Parameters: + - file: file-like object, the uploaded audio file to transcribe + - model: str, transcription model to use ("best" or "nano") + - speaker_labels: bool, whether to enable speaker diarization + + Returns: + - str: Transcribed text + """ + # Map the model selection to AssemblyAI's model classes + if model == "nano": + speech_model = aai.SpeechModel.nano + else: + speech_model = aai.SpeechModel.best + + # Save the file temporarily for the SDK to access + temp_file_path = "temp_audio_file.wav" # You can choose a unique name or path + with open(temp_file_path, "wb") as f: + f.write(file.read()) + + # Create the transcription configuration with model and speaker labels + config = aai.TranscriptionConfig( + speech_model=speech_model, + speaker_labels=speaker_labels + ) + + # Transcribe the audio + try: + transcript = transcriber.transcribe(temp_file_path, config) + except aai.TranscriptionError as e: + print(f"Transcription failed: {e}") + return None + + if transcript.status == aai.TranscriptStatus.error: + print(f"Transcription error: {transcript.error}") + return None + + # Clean up the temporary file + os.remove(temp_file_path) + + # Return the transcribed text + return transcript.text diff --git a/Preprocessing/preprocessing_pipeline/chunking_tokenization.py b/Preprocessing/preprocessing_pipeline/chunking_tokenization.py new file mode 100644 index 00000000..1e4f5150 --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/chunking_tokenization.py @@ -0,0 +1,14 @@ +def process_text_chunks(clean_text, chunk_size=500): + """ + Splits cleaned text into chunks of specified size. + + Parameters: + - clean_text: str, the text to split + - chunk_size: int, the number of words per chunk + + Returns: + - list of str: List of text chunks + """ + words = clean_text.split() + chunks = [" ".join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)] + return chunks diff --git a/Preprocessing/preprocessing_pipeline/pdf_conversion.py b/Preprocessing/preprocessing_pipeline/pdf_conversion.py new file mode 100644 index 00000000..e5fff39f --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/pdf_conversion.py @@ -0,0 +1,10 @@ +import fitz # PyMuPDF + +def convert_pdf_to_text(file): + text = "" + pdf_document = fitz.open(stream=file.read(), filetype="pdf") + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + pdf_document.close() + return text diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py new file mode 100644 index 00000000..d10bf283 --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py @@ -0,0 +1,35 @@ +import os +from openai import OpenAI +from utils.env_setup import load_env + +# Load environment variables +load_env() + +# Initialize the OpenAI client +client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +def clean_text(dirty_text): + # Context to guide the model + context_prompt = ( + "The following text is a transcription of a municipal meeting for the town of Cramerton. " + "The transcription quality may be poor, and some words, like the town's name, Cramerton, " + "may not have been transcribed correctly. If you encounter words that seem out of place or incorrect, " + "please correct them based on this context." + ) + + # Full prompt including context and original transcription + messages = [ + {"role": "system", "content": context_prompt}, + {"role": "user", "content": f"Clean the following text for readability and correct errors: {dirty_text}"} + ] + + # Create a chat completion with the specified model + response = client.chat.completions.create( + model="gpt-4", # Specify the model, e.g., gpt-4 or gpt-3.5-turbo + messages=messages, + max_tokens=500, + temperature=0.5 + ) + + # Extract and return the response text + return response.choices[0].message.content.strip() diff --git a/Preprocessing/preprocessing_pipeline/vector_embedding.py b/Preprocessing/preprocessing_pipeline/vector_embedding.py new file mode 100644 index 00000000..bfee2e82 --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/vector_embedding.py @@ -0,0 +1,33 @@ +import weaviate +import os +from utils.env_setup import load_env + +# Load environment variables +load_env() +WEAVIATE_URL = os.getenv("WEAVIATE_URL") +WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") + +# Set up the Weaviate client with API key authentication +client = weaviate.Client( + url=WEAVIATE_URL, + auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) +) + +def embed_text(chunks, metadata): + """ + Embeds text chunks and stores them in Weaviate with metadata. + + Parameters: + - chunks: list of str, text chunks to embed + - metadata: dict, metadata associated with each chunk + """ + for chunk in chunks: + client.data_object.create( + data_object={ + "content": chunk, + "meeting_date": str(metadata["meeting_date"]), + "meeting_type": metadata["meeting_type"], + "file_type": metadata["file_type"] + }, + class_name="MeetingDocument" + ) diff --git a/Preprocessing/utils/env_setup.py b/Preprocessing/utils/env_setup.py new file mode 100644 index 00000000..acc1b513 --- /dev/null +++ b/Preprocessing/utils/env_setup.py @@ -0,0 +1,12 @@ +import os +import sys +from dotenv import load_dotenv + +def load_env(): + """ + Loads environment variables from a .env file and adds PYTHONPATH. + """ + load_dotenv() + python_path = os.getenv("PYTHONPATH") + if python_path: + sys.path.append(python_path) diff --git a/Preprocessing/utils/file_utils.py b/Preprocessing/utils/file_utils.py new file mode 100644 index 00000000..e69de29b diff --git a/Preprocessing/utils/metadata_utils.py b/Preprocessing/utils/metadata_utils.py new file mode 100644 index 00000000..e69de29b From c194a6f532a08b6ae3aaa2758e43c60fd83fa18e Mon Sep 17 00:00:00 2001 From: Riley LePrell <150089004+RileyLePrell@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:57:51 -0500 Subject: [PATCH 2/5] Update main.py --- Preprocessing/App/main.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index 3396a92e..3ef27610 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -1,7 +1,9 @@ +# Standard Python imports import os import sys + +# Load environment variables and set Python path from dotenv import load_dotenv -from datetime import datetime # Load environment variables from .env load_dotenv() @@ -11,7 +13,8 @@ if python_path: sys.path.append(python_path) - +# Now import all other dependencies +from datetime import datetime import streamlit as st import weaviate # Import Weaviate client from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text From 305cbb28228e385a876db647f76e3cd807dcbb4a Mon Sep 17 00:00:00 2001 From: Riley LePrell Date: Sun, 17 Nov 2024 19:44:32 -0500 Subject: [PATCH 3/5] Cloud Functionality - Create Azure Blob; Streamlit interacts and stores files at different stages of preprocessing pipeline. - Pipeline is up and running - View Document Page to see files uploaded --- Preprocessing/App/main.py | 237 +++++++++--------- Preprocessing/docker/requirements.txt | 8 +- .../audio_transcription.py | 74 +++--- .../chunking_tokenization.py | 14 -- .../chunking_vector_embedding.py | 78 ++++++ .../preprocessing_pipeline/pdf_conversion.py | 37 ++- .../preprocessing_pipeline/text_cleaning.py | 85 +++++-- .../vector_embedding.py | 33 --- Preprocessing/utils/azure_blob_utils.py | 64 +++++ 9 files changed, 398 insertions(+), 232 deletions(-) delete mode 100644 Preprocessing/preprocessing_pipeline/chunking_tokenization.py create mode 100644 Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py delete mode 100644 Preprocessing/preprocessing_pipeline/vector_embedding.py create mode 100644 Preprocessing/utils/azure_blob_utils.py diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index 3ef27610..488734d1 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -20,8 +20,9 @@ from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text from preprocessing_pipeline.audio_transcription import transcribe_audio from preprocessing_pipeline.text_cleaning import clean_text -from preprocessing_pipeline.chunking_tokenization import process_text_chunks -from preprocessing_pipeline.vector_embedding import embed_text +from preprocessing_pipeline.chunking_vector_embedding import process_and_embed_text +from utils.azure_blob_utils import upload_to_azure, download_from_azure +from utils.azure_blob_utils import list_blobs_in_folder, download_from_azure # Set up Weaviate client client = weaviate.Client( @@ -29,6 +30,20 @@ auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY")) ) +# Generate standardized file names +def generate_file_name(metadata, stage): + meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d") + meeting_type = "BOC" if metadata["meeting_type"] == "Board of Commissioners" else "PB" + file_type = metadata["file_type"] + return f"{meeting_date}_{meeting_type}_{file_type}_{stage}" + +# Check and overwrite files in the local storage +def save_file_with_overwrite(file_path, content): + if os.path.exists(file_path): + os.remove(file_path) # Overwrite existing file + with open(file_path, "w") as f: + f.write(content) + # Fetch documents from Weaviate def fetch_uploaded_documents(): # Query Weaviate for documents @@ -127,22 +142,19 @@ def home_page(): if st.button("View Documents", key="view", help="View the documents that have been uploaded"): st.session_state.page = "view" +# Define pages def upload_files_page(): st.title("Upload Municipal Meeting Documents") # Sidebar for metadata and options selection st.sidebar.header("Document Metadata & Transcription Options") - - # Metadata Input Fields meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today()) meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"]) file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"]) - - # Transcription Model and Language Options model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"]) speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization") - # Save Metadata Button + # Save metadata if st.sidebar.button("Save Metadata"): st.session_state["metadata"] = { "meeting_date": meeting_date, @@ -160,73 +172,64 @@ def upload_files_page(): if file and "metadata" in st.session_state: metadata = st.session_state["metadata"] - progress_bar.progress(10) - st.write("Stage: Metadata Saved") - - if metadata["file_type"] in ["Agenda", "Minutes"] and file.type == "application/pdf": - # Stage: PDF to Dirty Text Conversion - with st.spinner("Converting PDF to text..."): - dirty_text = convert_pdf_to_text(file) - - pdf_text_path = "pdf_text_output.txt" - with open(pdf_text_path, "w") as f: - f.write(dirty_text) - - progress_bar.progress(30) - st.write("Stage: PDF Conversion Complete") - - # Display and download PDF text conversion - st.text_area("Converted PDF Text:", dirty_text, height=200) - st.download_button("Download PDF Text", data=dirty_text, file_name=pdf_text_path) - - elif metadata["file_type"] == "Audio" and file.type in ["audio/mpeg", "audio/wav"]: - # Stage: Audio Transcription with selected model and speaker labels + + # Preserve the original file extension + file_extension = os.path.splitext(file.name)[1] + raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}" + + # Stage 1: Upload to Raw + upload_to_azure("raw", raw_file_name, file.read()) + st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}") + progress_bar.progress(20) + + # Stage 2: Process based on file type + if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: + # Transcribe audio with st.spinner(f"Transcribing audio using {metadata['model']} model..."): - dirty_text = transcribe_audio(file, model=metadata["model"], speaker_labels=metadata["speaker_labels"]) - - transcription_path = "transcription_output.txt" - with open(transcription_path, "w") as f: - f.write(dirty_text) - - progress_bar.progress(30) - st.write("Stage: Audio Transcription Complete") - - # Display and download transcription - st.text_area("Audio Transcription:", dirty_text, height=200) - st.download_button("Download Transcription", data=dirty_text, file_name=transcription_path) - - # Continue processing if dirty_text was successfully created - if dirty_text: - # Stage: Text Cleaning - with st.spinner("Cleaning text with generative AI..."): - partly_clean_text = clean_text(dirty_text) - - cleaned_text_path = "cleaned_text_output.txt" - with open(cleaned_text_path, "w") as f: - f.write(partly_clean_text) - - progress_bar.progress(60) - st.write("Stage: Text Cleaning Complete") - - # Display and download cleaned text - st.text_area("Cleaned Text:", partly_clean_text, height=200) - st.download_button("Download Cleaned Text", data=partly_clean_text, file_name=cleaned_text_path) - - # Stage: Chunking and Tokenization - with st.spinner("Chunking and tokenizing text..."): - text_chunks = process_text_chunks(partly_clean_text) - progress_bar.progress(80) - st.write("Stage: Chunking and Tokenization Complete") - - # Stage: Embedding and Storage - with st.spinner("Embedding and storing in Weaviate..."): - embed_text(text_chunks, metadata) - progress_bar.progress(100) - st.write("Stage: Embedding and Storage Complete") - - st.success("Document processed and embedded with metadata!") - else: - st.error("Failed to process the document.") + transcribed_text = transcribe_audio( + raw_file_name=raw_file_name, + model=metadata["model"], + speaker_labels=metadata["speaker_labels"] + ) + if transcribed_text: + dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt" + upload_to_azure("dirty", dirty_file_name, transcribed_text) + st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}") + st.text_area("Transcribed Audio Text:", transcribed_text, height=200) + st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name) + else: + st.error("Failed to transcribe the audio.") + + elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": + # Extract text from PDF + with st.spinner("Extracting text from PDF..."): + extracted_text = convert_pdf_to_text(raw_file_name) + if extracted_text: + dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt" + upload_to_azure("dirty", dirty_file_name, extracted_text) + st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}") + st.text_area("Extracted PDF Text:", extracted_text, height=200) + st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name) + else: + st.error("Failed to extract text from the PDF.") + + # Stage 3: Clean Text and Upload to Clean + dirty_content = download_from_azure("dirty", dirty_file_name) + with st.spinner("Cleaning text using generative AI..."): + cleaned_text = clean_text(dirty_file_name) # Updated to handle chunked cleaning + clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" + upload_to_azure("clean", clean_file_name, cleaned_text) + st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") + + # Display cleaned text + st.text_area("Cleaned Text:", cleaned_text, height=200) + st.download_button("Download Cleaned Text", data=cleaned_text, file_name=clean_file_name) + + # Stage 4: Chunk and Embed into Weaviate + with st.spinner("Chunking and embedding text into Weaviate..."): + process_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function + st.success("Document processed and embedded successfully!") + progress_bar.progress(100) # Navigation buttons (centered) col1, col2 = st.columns([1, 1]) @@ -237,50 +240,51 @@ def upload_files_page(): if st.button("View Documents"): st.session_state.page = "view" +# Define the view_documents_page function def view_documents_page(): st.title("Uploaded Documents") - # Retrieve Weaviate URL and API Key from environment variables - weaviate_url = os.getenv("WEAVIATE_URL") - weaviate_api_key = os.getenv("WEAVIATE_API_KEY") - - if not weaviate_url or not weaviate_api_key: - st.error("Weaviate connection details (URL or API Key) are missing.") - return - - # Initialize Weaviate client with API key for authentication - client = weaviate.Client( - url=weaviate_url, - auth_client_secret=weaviate_api_key - ) - - # Fetch all objects from Weaviate + # Fetch files from the Azure Blob Storage try: - # Get all objects from the collection (assuming "Documents" is the name of your collection) - result = client.data_object.get(class_name="Documents", properties=["file_name", "file_type", "meeting_date", "meeting_type", "clean_text", "chunks"]) - - if result['objects']: - for item in result['objects']: - file_name = item['properties'].get('file_name', 'N/A') - file_type = item['properties'].get('file_type', 'N/A') - meeting_date = item['properties'].get('meeting_date', 'N/A') - meeting_type = item['properties'].get('meeting_type', 'N/A') - clean_text = item['properties'].get('clean_text', 'No clean text available') - chunks = item['properties'].get('chunks', 'No chunks available') - - # Display the document details in Streamlit - st.subheader(f"Document: {file_name}") - st.write(f"**File Type:** {file_type}") - st.write(f"**Meeting Date:** {meeting_date}") - st.write(f"**Meeting Type:** {meeting_type}") - st.write(f"**Clean Text:** {clean_text[:300]}...") # Show a preview of the clean text - st.write(f"**Chunks:** {chunks[:300]}...") # Show a preview of the chunks - st.write("---") - else: - st.write("No documents found in the Weaviate database.") - except Exception as e: - st.error(f"Error fetching documents from Weaviate: {e}") + # List blobs in the 'raw', 'dirty', and 'clean' folders + raw_blobs = list_blobs_in_folder("raw") + dirty_blobs = list_blobs_in_folder("dirty") + clean_blobs = list_blobs_in_folder("clean") + + # Display documents from 'raw' folder + if raw_blobs: + st.subheader("Raw Documents") + for blob in raw_blobs: + st.write(f"- {blob}") + if st.button(f"Download {blob}", key=f"download_raw_{blob}"): + file_content = download_from_azure("raw", blob) + st.download_button("Download", data=file_content, file_name=blob) + + # Display documents from 'dirty' folder + if dirty_blobs: + st.subheader("Dirty Documents") + for blob in dirty_blobs: + st.write(f"- {blob}") + if st.button(f"Download {blob}", key=f"download_dirty_{blob}"): + file_content = download_from_azure("dirty", blob) + st.download_button("Download", data=file_content, file_name=blob) + + # Display documents from 'clean' folder + if clean_blobs: + st.subheader("Clean Documents") + for blob in clean_blobs: + st.write(f"- {blob}") + if st.button(f"Download {blob}", key=f"download_clean_{blob}"): + file_content = download_from_azure("clean", blob) + st.download_button("Download", data=file_content, file_name=blob) + + # If no files are found in any folder + if not raw_blobs and not dirty_blobs and not clean_blobs: + st.write("No documents found in the Azure Blob Storage.") + except Exception as e: + st.error(f"Error fetching documents from Azure Blob Storage: {e}") + # Navigation buttons (centered) col1, col2 = st.columns([1, 1]) with col1: @@ -289,14 +293,7 @@ def view_documents_page(): with col2: if st.button("Upload Files"): st.session_state.page = "upload" - # Navigation buttons (centered) - col1, col2 = st.columns([1, 1]) - with col1: - if st.button("Return Home"): - st.session_state.page = "home" - with col2: - if st.button("Upload Files"): - st.session_state.page = "upload" + # Main page selection if "page" not in st.session_state: diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt index 4e43ca41..03fcd3cd 100644 --- a/Preprocessing/docker/requirements.txt +++ b/Preprocessing/docker/requirements.txt @@ -11,10 +11,10 @@ weaviate-client==4.7.1 openai==1.54.3 # Streamlit for web UI -streamlit==1.16.0 +streamlit # PDF handling (for PDF to text conversion) -PyMuPDF==1.18.19 +PyMuPDF -# OpenAI API client (if you're using GPT models for text cleaning) -openai==1.54.3 +# azure portal +azure.storage.blob \ No newline at end of file diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py index b98f64c4..fbe4f57e 100644 --- a/Preprocessing/preprocessing_pipeline/audio_transcription.py +++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py @@ -1,4 +1,5 @@ import assemblyai as aai +from utils.azure_blob_utils import download_from_azure # For Azure integration import os from utils.env_setup import load_env @@ -8,48 +9,47 @@ aai.settings.api_key = ASSEMBLY_AI_KEY transcriber = aai.Transcriber() -def transcribe_audio(file, model="best", speaker_labels=False): +def transcribe_audio(raw_file_name, model="best", speaker_labels=False): """ - Transcribes an audio file using AssemblyAI with specified model and speaker labels option. + Transcribes an audio file using AssemblyAI directly from Azure Blob Storage. Parameters: - - file: file-like object, the uploaded audio file to transcribe - - model: str, transcription model to use ("best" or "nano") - - speaker_labels: bool, whether to enable speaker diarization + - raw_file_name (str): Name of the raw file in Azure Blob Storage to transcribe. + - model (str): Transcription model to use ("best" or "nano"). + - speaker_labels (bool): Whether to enable speaker diarization. Returns: - - str: Transcribed text + - str: Transcribed text, or None if transcription fails. """ - # Map the model selection to AssemblyAI's model classes - if model == "nano": - speech_model = aai.SpeechModel.nano - else: - speech_model = aai.SpeechModel.best - - # Save the file temporarily for the SDK to access - temp_file_path = "temp_audio_file.wav" # You can choose a unique name or path - with open(temp_file_path, "wb") as f: - f.write(file.read()) - - # Create the transcription configuration with model and speaker labels - config = aai.TranscriptionConfig( - speech_model=speech_model, - speaker_labels=speaker_labels - ) - - # Transcribe the audio try: - transcript = transcriber.transcribe(temp_file_path, config) - except aai.TranscriptionError as e: - print(f"Transcription failed: {e}") + # Step 1: Download raw audio from Azure Blob Storage + raw_content = download_from_azure("raw", raw_file_name, as_text=False) + print(f"Downloaded {raw_file_name} from Azure for transcription.") + + # Step 2: Map transcription model + if model == "nano": + speech_model = aai.SpeechModel.nano + else: + speech_model = aai.SpeechModel.best + + # Step 3: Configure transcription + config = aai.TranscriptionConfig( + speech_model=speech_model, + speaker_labels=speaker_labels + ) + + # Step 4: Start transcription + print("Starting transcription...") + transcript = transcriber.transcribe_audio_bytes(raw_content, config) + + # Step 5: Handle response + if transcript.status == aai.TranscriptStatus.error: + print(f"Transcription error: {transcript.error}") + return None + + print("Transcription completed successfully.") + return transcript.text + + except Exception as e: + print(f"Error during transcription: {e}") return None - - if transcript.status == aai.TranscriptStatus.error: - print(f"Transcription error: {transcript.error}") - return None - - # Clean up the temporary file - os.remove(temp_file_path) - - # Return the transcribed text - return transcript.text diff --git a/Preprocessing/preprocessing_pipeline/chunking_tokenization.py b/Preprocessing/preprocessing_pipeline/chunking_tokenization.py deleted file mode 100644 index 1e4f5150..00000000 --- a/Preprocessing/preprocessing_pipeline/chunking_tokenization.py +++ /dev/null @@ -1,14 +0,0 @@ -def process_text_chunks(clean_text, chunk_size=500): - """ - Splits cleaned text into chunks of specified size. - - Parameters: - - clean_text: str, the text to split - - chunk_size: int, the number of words per chunk - - Returns: - - list of str: List of text chunks - """ - words = clean_text.split() - chunks = [" ".join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)] - return chunks diff --git a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py new file mode 100644 index 00000000..4839f8a9 --- /dev/null +++ b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py @@ -0,0 +1,78 @@ +import weaviate +import os +from utils.env_setup import load_env +from utils.azure_blob_utils import download_from_azure + +# Load environment variables +load_env() +WEAVIATE_URL = os.getenv("WEAVIATE_URL") +WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") + +# Set up the Weaviate client with API key authentication +client = weaviate.Client( + url=WEAVIATE_URL, + auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) +) + +def process_and_embed_text(clean_file_name, metadata, chunk_size=500): + """ + Combines chunking and embedding into a single process: + - Splits the cleaned text into smaller chunks. + - Embeds each chunk into Weaviate with metadata. + + Parameters: + - clean_file_name: str, name of the cleaned text file in Azure Blob Storage (clean folder). + - metadata: dict, metadata associated with each chunk (e.g., meeting date, type, etc.). + - chunk_size: int, number of words per chunk. + """ + try: + # Step 1: Download the cleaned text from Azure and chunk it + print(f"Downloading and chunking the text from {clean_file_name}...") + clean_text = download_from_azure("clean", clean_file_name) + words = clean_text.split() + chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] + + # Extract metadata + meeting_date = str(metadata["meeting_date"]) + meeting_type = metadata["meeting_type"] + file_type = metadata["file_type"] + + # Step 2: Check for existing documents in Weaviate with the same metadata and delete them + query = f""" + {{ + Get {{ + MeetingDocument(where: {{ + path: ["meeting_date", "meeting_type", "file_type"], + operator: And, + valueString: "{meeting_date}" + }}) {{ + id + }} + }} + }} + """ + response = client.query.raw(query) + existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", []) + + # Step 3: Delete any existing documents with matching metadata in Weaviate + for doc in existing_documents: + client.data_object.delete(doc["id"]) + print(f"Deleted {len(existing_documents)} existing documents with matching metadata.") + + # Step 4: Embed and store new chunks in Weaviate + for chunk in chunks: + client.data_object.create( + data_object={ + "content": chunk, + "meeting_date": meeting_date, + "meeting_type": meeting_type, + "file_type": file_type + }, + class_name="MeetingDocument" + ) + print(f"Embedded chunk for {clean_file_name} in Weaviate.") + + print(f"Successfully embedded {len(chunks)} chunks for {clean_file_name}.") + + except Exception as e: + print(f"Error during chunking and embedding: {e}") diff --git a/Preprocessing/preprocessing_pipeline/pdf_conversion.py b/Preprocessing/preprocessing_pipeline/pdf_conversion.py index e5fff39f..0e23c92a 100644 --- a/Preprocessing/preprocessing_pipeline/pdf_conversion.py +++ b/Preprocessing/preprocessing_pipeline/pdf_conversion.py @@ -1,10 +1,31 @@ import fitz # PyMuPDF +from utils.azure_blob_utils import download_from_azure -def convert_pdf_to_text(file): - text = "" - pdf_document = fitz.open(stream=file.read(), filetype="pdf") - for page_num in range(pdf_document.page_count): - page = pdf_document[page_num] - text += page.get_text() - pdf_document.close() - return text +def convert_pdf_to_text(raw_file_name): + """ + Extracts text from a PDF file. + + Args: + raw_file_name (str): Name of the PDF file in Azure Blob Storage (raw folder). + + Returns: + str: Extracted text from the PDF. + """ + try: + # Step 1: Download the raw file from Azure Blob Storage + raw_content = download_from_azure("raw", raw_file_name, as_text=False) + + # Step 2: Open the PDF content and extract text + text = "" + pdf_document = fitz.open(stream=raw_content, filetype="pdf") + for page_num in range(pdf_document.page_count): + page = pdf_document[page_num] + text += page.get_text() + pdf_document.close() + + print(f"Successfully extracted text from {raw_file_name}.") + return text + + except Exception as e: + print(f"Error extracting text from PDF {raw_file_name}: {e}") + return None diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py index d10bf283..f162b8fa 100644 --- a/Preprocessing/preprocessing_pipeline/text_cleaning.py +++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py @@ -1,35 +1,88 @@ import os from openai import OpenAI from utils.env_setup import load_env +from utils.azure_blob_utils import download_from_azure, upload_to_azure # Load environment variables load_env() - -# Initialize the OpenAI client client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) -def clean_text(dirty_text): - # Context to guide the model +def split_text(text, max_chunk_size=3000): + """ + Splits a large text into smaller chunks, each within the specified token size. + + Args: + text (str): The text to split. + max_chunk_size (int): Maximum token size for each chunk. + + Returns: + list of str: List of smaller text chunks. + """ + chunks = [] + words = text.split() + chunk = [] + current_size = 0 + + for word in words: + current_size += len(word) + 1 # +1 accounts for spaces + if current_size > max_chunk_size: + chunks.append(" ".join(chunk)) + chunk = [] + current_size = len(word) + 1 + chunk.append(word) + + if chunk: + chunks.append(" ".join(chunk)) + + return chunks + +def clean_text_chunk(chunk): + """ + Cleans a single chunk of text using OpenAI GPT. + + Args: + chunk (str): Text chunk to clean. + + Returns: + str: Cleaned text. + """ context_prompt = ( "The following text is a transcription of a municipal meeting for the town of Cramerton. " - "The transcription quality may be poor, and some words, like the town's name, Cramerton, " - "may not have been transcribed correctly. If you encounter words that seem out of place or incorrect, " - "please correct them based on this context." + "Please clean it for readability and correct any errors or inconsistencies." ) - - # Full prompt including context and original transcription messages = [ {"role": "system", "content": context_prompt}, - {"role": "user", "content": f"Clean the following text for readability and correct errors: {dirty_text}"} + {"role": "user", "content": f"Clean the following text for readability: {chunk}"} ] - - # Create a chat completion with the specified model + response = client.chat.completions.create( - model="gpt-4", # Specify the model, e.g., gpt-4 or gpt-3.5-turbo + model="gpt-4", messages=messages, - max_tokens=500, + max_tokens=2000, temperature=0.5 ) - - # Extract and return the response text return response.choices[0].message.content.strip() + +def clean_text(dirty_file_name): + """ + Cleans the given text file by splitting it into smaller chunks and processing each chunk. + + Args: + dirty_file_name (str): Name of the file in Azure Blob Storage (dirty folder). + + Returns: + str: Combined cleaned text. + """ + print(f"Downloading {dirty_file_name} from Azure Blob Storage...") + dirty_content = download_from_azure("dirty", dirty_file_name) + + # Split the text into chunks + chunks = split_text(dirty_content, max_chunk_size=3000) + cleaned_chunks = [] + + for i, chunk in enumerate(chunks): + print(f"Cleaning chunk {i + 1}/{len(chunks)}...") + cleaned_chunk = clean_text_chunk(chunk) + cleaned_chunks.append(cleaned_chunk) + + return "\n\n".join(cleaned_chunks) diff --git a/Preprocessing/preprocessing_pipeline/vector_embedding.py b/Preprocessing/preprocessing_pipeline/vector_embedding.py deleted file mode 100644 index bfee2e82..00000000 --- a/Preprocessing/preprocessing_pipeline/vector_embedding.py +++ /dev/null @@ -1,33 +0,0 @@ -import weaviate -import os -from utils.env_setup import load_env - -# Load environment variables -load_env() -WEAVIATE_URL = os.getenv("WEAVIATE_URL") -WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") - -# Set up the Weaviate client with API key authentication -client = weaviate.Client( - url=WEAVIATE_URL, - auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) -) - -def embed_text(chunks, metadata): - """ - Embeds text chunks and stores them in Weaviate with metadata. - - Parameters: - - chunks: list of str, text chunks to embed - - metadata: dict, metadata associated with each chunk - """ - for chunk in chunks: - client.data_object.create( - data_object={ - "content": chunk, - "meeting_date": str(metadata["meeting_date"]), - "meeting_type": metadata["meeting_type"], - "file_type": metadata["file_type"] - }, - class_name="MeetingDocument" - ) diff --git a/Preprocessing/utils/azure_blob_utils.py b/Preprocessing/utils/azure_blob_utils.py new file mode 100644 index 00000000..34dd3569 --- /dev/null +++ b/Preprocessing/utils/azure_blob_utils.py @@ -0,0 +1,64 @@ +from azure.storage.blob import BlobServiceClient +import os +from dotenv import load_dotenv +import chardet +load_dotenv() # Load environment variables from .env file + +# Set up the blob service client +connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING") +container_name = os.getenv("AZURE_STORAGE_CONTAINER") +blob_service_client = BlobServiceClient.from_connection_string(connection_string) +container_client = blob_service_client.get_container_client(container_name) + +def upload_to_azure(folder_name, file_name, file_content): + """ + Upload a file to Azure Blob Storage. + + Args: + folder_name (str): The folder in the Azure container (e.g., raw, dirty, clean). + file_name (str): The name of the file to upload. + file_content (bytes): The binary content of the file to upload. + """ + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) + blob_client.upload_blob(file_content, overwrite=True) + print(f"Uploaded to Azure: {blob_name}") + +def download_from_azure(folder_name, file_name, as_text=True): + """ + Download a file from Azure Blob Storage with streaming. + """ + blob_name = f"{folder_name}/{file_name}" + blob_client = container_client.get_blob_client(blob_name) + + # Print the URL for debugging + print(f"Generated Blob URL: {blob_client.url}") + + try: + downloader = blob_client.download_blob(max_concurrency=5) + if as_text: + # Read as binary first and detect encoding + raw_data = downloader.readall() + detected_encoding = chardet.detect(raw_data)['encoding'] + print(f"Detected encoding: {detected_encoding}") + return raw_data.decode(detected_encoding) # Decode using detected encoding + else: + print(f"Downloading {blob_name} as binary.") + return downloader.readall() # Return binary content + except Exception as e: + print(f"Error downloading blob {blob_name}: {e}") + raise e + + +def list_blobs_in_folder(folder_name): + """ + List all blobs in a specific folder in Azure Blob Storage. + + Args: + folder_name (str): The folder to list blobs from. + + Returns: + list: List of blob names. + """ + blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/") + return [blob.name for blob in blobs] From e6bbcfd61efa3da3e15051de140c7b8c0734ab91 Mon Sep 17 00:00:00 2001 From: Riley LePrell Date: Mon, 18 Nov 2024 20:09:01 -0500 Subject: [PATCH 4/5] Tokenization + Embedder - Added Tokenizer: Tiktoken - Chunks set to 250 - Embedder: "text-embedding-ada-002" --- Preprocessing/App/main.py | 4 +- Preprocessing/docker/requirements.txt | 6 +- .../audio_transcription.py | 85 +++++++++++++------ .../chunking_vector_embedding.py | 72 ++++++++++------ .../preprocessing_pipeline/text_cleaning.py | 36 ++++---- 5 files changed, 128 insertions(+), 75 deletions(-) diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index 488734d1..36d5f9a4 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -20,7 +20,7 @@ from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text from preprocessing_pipeline.audio_transcription import transcribe_audio from preprocessing_pipeline.text_cleaning import clean_text -from preprocessing_pipeline.chunking_vector_embedding import process_and_embed_text +from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text from utils.azure_blob_utils import upload_to_azure, download_from_azure from utils.azure_blob_utils import list_blobs_in_folder, download_from_azure @@ -227,7 +227,7 @@ def upload_files_page(): # Stage 4: Chunk and Embed into Weaviate with st.spinner("Chunking and embedding text into Weaviate..."): - process_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function + tokenize_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function st.success("Document processed and embedded successfully!") progress_bar.progress(100) diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt index 03fcd3cd..757357bb 100644 --- a/Preprocessing/docker/requirements.txt +++ b/Preprocessing/docker/requirements.txt @@ -17,4 +17,8 @@ streamlit PyMuPDF # azure portal -azure.storage.blob \ No newline at end of file +azure.storage.blob + +transformers + +chardet \ No newline at end of file diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py index fbe4f57e..9d836030 100644 --- a/Preprocessing/preprocessing_pipeline/audio_transcription.py +++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py @@ -1,54 +1,83 @@ -import assemblyai as aai -from utils.azure_blob_utils import download_from_azure # For Azure integration import os +import requests +from utils.azure_blob_utils import download_from_azure from utils.env_setup import load_env # Load environment variables load_env() ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY") -aai.settings.api_key = ASSEMBLY_AI_KEY -transcriber = aai.Transcriber() +ASSEMBLY_AI_ENDPOINT = "https://api.assemblyai.com/v2" -def transcribe_audio(raw_file_name, model="best", speaker_labels=False): +def transcribe_audio(raw_file_name, model=None, speaker_labels=False): """ - Transcribes an audio file using AssemblyAI directly from Azure Blob Storage. + Transcribes an audio file using AssemblyAI. Parameters: - - raw_file_name (str): Name of the raw file in Azure Blob Storage to transcribe. - - model (str): Transcription model to use ("best" or "nano"). + - raw_file_name (str): Name of the raw file in Azure Blob Storage. + - model (str): Transcription model to use (not currently implemented in AssemblyAI). - speaker_labels (bool): Whether to enable speaker diarization. Returns: - str: Transcribed text, or None if transcription fails. """ + headers = {"authorization": ASSEMBLY_AI_KEY} try: - # Step 1: Download raw audio from Azure Blob Storage + # Step 1: Download the raw audio file from Azure raw_content = download_from_azure("raw", raw_file_name, as_text=False) print(f"Downloaded {raw_file_name} from Azure for transcription.") - # Step 2: Map transcription model - if model == "nano": - speech_model = aai.SpeechModel.nano - else: - speech_model = aai.SpeechModel.best - - # Step 3: Configure transcription - config = aai.TranscriptionConfig( - speech_model=speech_model, - speaker_labels=speaker_labels + # Step 2: Upload the audio file to AssemblyAI + print("Uploading audio file to AssemblyAI...") + upload_response = requests.post( + f"{ASSEMBLY_AI_ENDPOINT}/upload", + headers=headers, + data=raw_content ) + if upload_response.status_code != 200: + print(f"Error uploading to AssemblyAI: {upload_response.status_code} - {upload_response.text}") + return None + + upload_url = upload_response.json()["upload_url"] + print(f"File uploaded to AssemblyAI. URL: {upload_url}") - # Step 4: Start transcription - print("Starting transcription...") - transcript = transcriber.transcribe_audio_bytes(raw_content, config) + # Step 3: Request transcription + print("Requesting transcription from AssemblyAI...") + transcription_payload = {"audio_url": upload_url} - # Step 5: Handle response - if transcript.status == aai.TranscriptStatus.error: - print(f"Transcription error: {transcript.error}") - return None + if speaker_labels: + transcription_payload["speaker_labels"] = True - print("Transcription completed successfully.") - return transcript.text + transcription_response = requests.post( + f"{ASSEMBLY_AI_ENDPOINT}/transcript", + headers=headers, + json=transcription_payload + ) + if transcription_response.status_code != 200: + print(f"Error submitting transcription request: {transcription_response.status_code} - {transcription_response.text}") + return None + + transcription_id = transcription_response.json()["id"] + print(f"Transcription request submitted. ID: {transcription_id}") + + # Step 4: Poll for transcription result + while True: + status_response = requests.get( + f"{ASSEMBLY_AI_ENDPOINT}/transcript/{transcription_id}", + headers=headers + ) + status_response.raise_for_status() + data = status_response.json() + + if data["status"] == "completed": + print("Transcription completed successfully.") + return data["text"] + elif data["status"] == "failed": + print(f"Transcription failed: {data['error']}") + return None + else: + print("Transcription in progress... Retrying in 5 seconds.") + import time + time.sleep(5) except Exception as e: print(f"Error during transcription: {e}") diff --git a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py index 4839f8a9..6e88de4c 100644 --- a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py +++ b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py @@ -1,5 +1,7 @@ -import weaviate import os +from openai import OpenAI +import weaviate +import tiktoken # Use tiktoken for OpenAI-compatible tokenization from utils.env_setup import load_env from utils.azure_blob_utils import download_from_azure @@ -7,37 +9,50 @@ load_env() WEAVIATE_URL = os.getenv("WEAVIATE_URL") WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY") +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") -# Set up the Weaviate client with API key authentication +# Initialize Weaviate client client = weaviate.Client( url=WEAVIATE_URL, auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY) ) -def process_and_embed_text(clean_file_name, metadata, chunk_size=500): +# Initialize OpenAI client for embedding +openai_client = OpenAI(api_key=OPENAI_API_KEY) + +# Initialize tiktoken for OpenAI's embedding model +tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002") + +def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250): """ - Combines chunking and embedding into a single process: - - Splits the cleaned text into smaller chunks. - - Embeds each chunk into Weaviate with metadata. + Tokenizes, chunks, and embeds cleaned text into Weaviate. - Parameters: - - clean_file_name: str, name of the cleaned text file in Azure Blob Storage (clean folder). - - metadata: dict, metadata associated with each chunk (e.g., meeting date, type, etc.). - - chunk_size: int, number of words per chunk. + Args: + clean_file_name (str): Name of the cleaned text file in Azure Blob Storage (clean folder). + metadata (dict): Metadata associated with the file (meeting_date, meeting_type, file_type). + max_chunk_size (int): Maximum token size for each chunk. """ try: - # Step 1: Download the cleaned text from Azure and chunk it - print(f"Downloading and chunking the text from {clean_file_name}...") + # Step 1: Download cleaned text from Azure clean_text = download_from_azure("clean", clean_file_name) - words = clean_text.split() - chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)] + print(f"Downloaded cleaned text from Azure for file: {clean_file_name}") - # Extract metadata + # Step 2: Tokenize the text using tiktoken + tokens = tokenizer.encode(clean_text) + + # Step 3: Chunk tokens into groups of max_chunk_size (default: 250 tokens per chunk) + chunks = [ + tokenizer.decode(tokens[i:i + max_chunk_size]) + for i in range(0, len(tokens), max_chunk_size) + ] + print(f"Tokenized and split text into {len(chunks)} chunks of {max_chunk_size} tokens each.") + + # Extract metadata for embedding meeting_date = str(metadata["meeting_date"]) meeting_type = metadata["meeting_type"] file_type = metadata["file_type"] - # Step 2: Check for existing documents in Weaviate with the same metadata and delete them + # Step 4: Check and delete existing embeddings in Weaviate (to prevent duplication) query = f""" {{ Get {{ @@ -54,25 +69,34 @@ def process_and_embed_text(clean_file_name, metadata, chunk_size=500): response = client.query.raw(query) existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", []) - # Step 3: Delete any existing documents with matching metadata in Weaviate for doc in existing_documents: client.data_object.delete(doc["id"]) - print(f"Deleted {len(existing_documents)} existing documents with matching metadata.") + print(f"Deleted {len(existing_documents)} existing embeddings for this file.") + + # Step 5: Embed each chunk using OpenAI and store in Weaviate + for i, chunk in enumerate(chunks): + # Generate embedding using OpenAI + response = openai_client.embeddings.create( + input=chunk, + model="text-embedding-ada-002" + ) + embedding = response.data[0].embedding # Correctly access embedding from the response object - # Step 4: Embed and store new chunks in Weaviate - for chunk in chunks: + # Upload chunk to Weaviate client.data_object.create( data_object={ "content": chunk, "meeting_date": meeting_date, "meeting_type": meeting_type, - "file_type": file_type + "file_type": file_type, + "chunk_index": i # Include chunk index for ordering }, + vector=embedding, class_name="MeetingDocument" ) - print(f"Embedded chunk for {clean_file_name} in Weaviate.") + print(f"Uploaded chunk {i+1}/{len(chunks)} to Weaviate.") - print(f"Successfully embedded {len(chunks)} chunks for {clean_file_name}.") + print("Successfully processed and embedded all chunks.") except Exception as e: - print(f"Error during chunking and embedding: {e}") + print(f"Error during tokenization and embedding: {e}") diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py index f162b8fa..a9912220 100644 --- a/Preprocessing/preprocessing_pipeline/text_cleaning.py +++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py @@ -1,5 +1,6 @@ import os from openai import OpenAI +import tiktoken # Use tiktoken for OpenAI-compatible tokenization from utils.env_setup import load_env from utils.azure_blob_utils import download_from_azure, upload_to_azure @@ -7,9 +8,12 @@ load_env() client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) -def split_text(text, max_chunk_size=3000): +# Initialize tiktoken for OpenAI's GPT models +tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo") # Specify the OpenAI model + +def tokenize_and_split_text(text, max_chunk_size=250): """ - Splits a large text into smaller chunks, each within the specified token size. + Tokenizes and splits text into smaller chunks within the token size limit. Args: text (str): The text to split. @@ -18,22 +22,14 @@ def split_text(text, max_chunk_size=3000): Returns: list of str: List of smaller text chunks. """ - chunks = [] - words = text.split() - chunk = [] - current_size = 0 - - for word in words: - current_size += len(word) + 1 # +1 accounts for spaces - if current_size > max_chunk_size: - chunks.append(" ".join(chunk)) - chunk = [] - current_size = len(word) + 1 - chunk.append(word) - - if chunk: - chunks.append(" ".join(chunk)) + # Tokenize the text into tokens + tokens = tokenizer.encode(text) + # Split tokens into chunks of max_chunk_size + chunks = [ + tokenizer.decode(tokens[i:i + max_chunk_size]) + for i in range(0, len(tokens), max_chunk_size) + ] return chunks def clean_text_chunk(chunk): @@ -56,7 +52,7 @@ def clean_text_chunk(chunk): ] response = client.chat.completions.create( - model="gpt-4", + model="gpt-3.5-turbo", messages=messages, max_tokens=2000, temperature=0.5 @@ -76,8 +72,8 @@ def clean_text(dirty_file_name): print(f"Downloading {dirty_file_name} from Azure Blob Storage...") dirty_content = download_from_azure("dirty", dirty_file_name) - # Split the text into chunks - chunks = split_text(dirty_content, max_chunk_size=3000) + # Tokenize and split the text into chunks of 250 tokens + chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250) cleaned_chunks = [] for i, chunk in enumerate(chunks): From 94bfff61d9c12baa6017caee432ad55abf631ee1 Mon Sep 17 00:00:00 2001 From: Riley LePrell Date: Thu, 21 Nov 2024 18:32:49 -0500 Subject: [PATCH 5/5] Formatting + Upload Size - Files were capped at 200mb; Increased this to 1 gig. - Formatting Issues + Fixing some problems w/ streamlit not running --- Preprocessing/.streamlit/config.toml | 2 + Preprocessing/App/main.py | 104 +++++++++++---------------- 2 files changed, 44 insertions(+), 62 deletions(-) create mode 100644 Preprocessing/.streamlit/config.toml diff --git a/Preprocessing/.streamlit/config.toml b/Preprocessing/.streamlit/config.toml new file mode 100644 index 00000000..f41f6e6f --- /dev/null +++ b/Preprocessing/.streamlit/config.toml @@ -0,0 +1,2 @@ +[server] +maxUploadSize = 1000 # Set the upload size limit in MB diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py index 36d5f9a4..4d8d5a2e 100644 --- a/Preprocessing/App/main.py +++ b/Preprocessing/App/main.py @@ -1,11 +1,10 @@ # Standard Python imports import os import sys +from datetime import datetime # Load environment variables and set Python path from dotenv import load_dotenv - -# Load environment variables from .env load_dotenv() # Set PYTHONPATH from .env if available @@ -13,16 +12,18 @@ if python_path: sys.path.append(python_path) -# Now import all other dependencies -from datetime import datetime +# Import dependencies import streamlit as st import weaviate # Import Weaviate client from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text from preprocessing_pipeline.audio_transcription import transcribe_audio from preprocessing_pipeline.text_cleaning import clean_text from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text -from utils.azure_blob_utils import upload_to_azure, download_from_azure -from utils.azure_blob_utils import list_blobs_in_folder, download_from_azure +from utils.azure_blob_utils import ( + upload_to_azure, + download_from_azure, + list_blobs_in_folder +) # Set up Weaviate client client = weaviate.Client( @@ -30,23 +31,22 @@ auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY")) ) -# Generate standardized file names +# Helper function: Generate standardized file names def generate_file_name(metadata, stage): meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d") meeting_type = "BOC" if metadata["meeting_type"] == "Board of Commissioners" else "PB" file_type = metadata["file_type"] return f"{meeting_date}_{meeting_type}_{file_type}_{stage}" -# Check and overwrite files in the local storage +# Helper function: Check and overwrite files in local storage def save_file_with_overwrite(file_path, content): if os.path.exists(file_path): os.remove(file_path) # Overwrite existing file with open(file_path, "w") as f: f.write(content) -# Fetch documents from Weaviate +# Helper function: Fetch documents from Weaviate def fetch_uploaded_documents(): - # Query Weaviate for documents query = """ { Get { @@ -65,27 +65,24 @@ def fetch_uploaded_documents(): documents = response.get("data", {}).get("Get", {}).get("Documents", []) return documents -# Define pages +# Home Page def home_page(): - # Apply custom styling with IBM Plex Mono - st.markdown(f""" + # Custom styling with IBM Plex Mono + st.markdown(""" """, unsafe_allow_html=True) - st.markdown(f""" + st.markdown("""

Minute Mate

@@ -130,19 +122,17 @@ def home_page():

""", unsafe_allow_html=True) - - # Navigation buttons (centered) - col1, col2 = st.columns([1, 1]) + # Navigation buttons + col1, col2 = st.columns([1, 1]) with col1: if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"): st.session_state.page = "upload" - with col2: if st.button("View Documents", key="view", help="View the documents that have been uploaded"): st.session_state.page = "view" -# Define pages +# Upload Files Page def upload_files_page(): st.title("Upload Municipal Meeting Documents") @@ -172,7 +162,7 @@ def upload_files_page(): if file and "metadata" in st.session_state: metadata = st.session_state["metadata"] - + # Preserve the original file extension file_extension = os.path.splitext(file.name)[1] raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}" @@ -184,7 +174,6 @@ def upload_files_page(): # Stage 2: Process based on file type if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]: - # Transcribe audio with st.spinner(f"Transcribing audio using {metadata['model']} model..."): transcribed_text = transcribe_audio( raw_file_name=raw_file_name, @@ -201,7 +190,6 @@ def upload_files_page(): st.error("Failed to transcribe the audio.") elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf": - # Extract text from PDF with st.spinner("Extracting text from PDF..."): extracted_text = convert_pdf_to_text(raw_file_name) if extracted_text: @@ -216,7 +204,7 @@ def upload_files_page(): # Stage 3: Clean Text and Upload to Clean dirty_content = download_from_azure("dirty", dirty_file_name) with st.spinner("Cleaning text using generative AI..."): - cleaned_text = clean_text(dirty_file_name) # Updated to handle chunked cleaning + cleaned_text = clean_text(dirty_file_name) clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt" upload_to_azure("clean", clean_file_name, cleaned_text) st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}") @@ -227,11 +215,11 @@ def upload_files_page(): # Stage 4: Chunk and Embed into Weaviate with st.spinner("Chunking and embedding text into Weaviate..."): - tokenize_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function + tokenize_and_embed_text(clean_file_name, metadata) st.success("Document processed and embedded successfully!") progress_bar.progress(100) - # Navigation buttons (centered) + # Navigation buttons col1, col2 = st.columns([1, 1]) with col1: if st.button("Return Home"): @@ -240,18 +228,15 @@ def upload_files_page(): if st.button("View Documents"): st.session_state.page = "view" -# Define the view_documents_page function +# View Documents Page def view_documents_page(): st.title("Uploaded Documents") - - # Fetch files from the Azure Blob Storage try: - # List blobs in the 'raw', 'dirty', and 'clean' folders raw_blobs = list_blobs_in_folder("raw") dirty_blobs = list_blobs_in_folder("dirty") clean_blobs = list_blobs_in_folder("clean") - # Display documents from 'raw' folder + # Display documents by category if raw_blobs: st.subheader("Raw Documents") for blob in raw_blobs: @@ -260,7 +245,6 @@ def view_documents_page(): file_content = download_from_azure("raw", blob) st.download_button("Download", data=file_content, file_name=blob) - # Display documents from 'dirty' folder if dirty_blobs: st.subheader("Dirty Documents") for blob in dirty_blobs: @@ -269,7 +253,6 @@ def view_documents_page(): file_content = download_from_azure("dirty", blob) st.download_button("Download", data=file_content, file_name=blob) - # Display documents from 'clean' folder if clean_blobs: st.subheader("Clean Documents") for blob in clean_blobs: @@ -278,14 +261,12 @@ def view_documents_page(): file_content = download_from_azure("clean", blob) st.download_button("Download", data=file_content, file_name=blob) - # If no files are found in any folder if not raw_blobs and not dirty_blobs and not clean_blobs: st.write("No documents found in the Azure Blob Storage.") - except Exception as e: - st.error(f"Error fetching documents from Azure Blob Storage: {e}") - - # Navigation buttons (centered) + st.error(f"Error fetching documents from Azure Blob Storage: {e}") + + # Navigation buttons col1, col2 = st.columns([1, 1]) with col1: if st.button("Return Home"): @@ -294,8 +275,7 @@ def view_documents_page(): if st.button("Upload Files"): st.session_state.page = "upload" - -# Main page selection +# Main page selection logic if "page" not in st.session_state: st.session_state.page = "home"