Skip to content

Commit

Permalink
Cloud Functionality
Browse files Browse the repository at this point in the history
- Create Azure Blob; Streamlit interacts and stores files at different stages of preprocessing pipeline.

- Pipeline is up and running

- View Document Page to see files uploaded
  • Loading branch information
RileyLePrell committed Nov 18, 2024
1 parent c194a6f commit 305cbb2
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 232 deletions.
237 changes: 117 additions & 120 deletions Preprocessing/App/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,30 @@
from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text

Check failure on line 20 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:20:1: E402 Module level import not at top of file
from preprocessing_pipeline.audio_transcription import transcribe_audio

Check failure on line 21 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:21:1: E402 Module level import not at top of file
from preprocessing_pipeline.text_cleaning import clean_text

Check failure on line 22 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:22:1: E402 Module level import not at top of file
from preprocessing_pipeline.chunking_tokenization import process_text_chunks
from preprocessing_pipeline.vector_embedding import embed_text
from preprocessing_pipeline.chunking_vector_embedding import process_and_embed_text

Check failure on line 23 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:23:1: E402 Module level import not at top of file
from utils.azure_blob_utils import upload_to_azure, download_from_azure

Check failure on line 24 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:24:1: E402 Module level import not at top of file
from utils.azure_blob_utils import list_blobs_in_folder, download_from_azure

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:25:1: E402 Module level import not at top of file

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F811)

Preprocessing/App/main.py:25:58: F811 Redefinition of unused `download_from_azure` from line 24

# Set up Weaviate client
client = weaviate.Client(
url=os.getenv("WEAVIATE_URL"),
auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY"))
)

# Generate standardized file names
def generate_file_name(metadata, stage):
meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d")
meeting_type = "BOC" if metadata["meeting_type"] == "Board of Commissioners" else "PB"
file_type = metadata["file_type"]
return f"{meeting_date}_{meeting_type}_{file_type}_{stage}"

# Check and overwrite files in the local storage
def save_file_with_overwrite(file_path, content):
if os.path.exists(file_path):
os.remove(file_path) # Overwrite existing file
with open(file_path, "w") as f:
f.write(content)

# Fetch documents from Weaviate
def fetch_uploaded_documents():
# Query Weaviate for documents
Expand Down Expand Up @@ -127,22 +142,19 @@ def home_page():
if st.button("View Documents", key="view", help="View the documents that have been uploaded"):
st.session_state.page = "view"

# Define pages
def upload_files_page():
st.title("Upload Municipal Meeting Documents")

# Sidebar for metadata and options selection
st.sidebar.header("Document Metadata & Transcription Options")

# Metadata Input Fields
meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today())
meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"])
file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"])

# Transcription Model and Language Options
model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"])
speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization")

# Save Metadata Button
# Save metadata
if st.sidebar.button("Save Metadata"):
st.session_state["metadata"] = {
"meeting_date": meeting_date,
Expand All @@ -160,73 +172,64 @@ def upload_files_page():

if file and "metadata" in st.session_state:
metadata = st.session_state["metadata"]
progress_bar.progress(10)
st.write("Stage: Metadata Saved")

if metadata["file_type"] in ["Agenda", "Minutes"] and file.type == "application/pdf":
# Stage: PDF to Dirty Text Conversion
with st.spinner("Converting PDF to text..."):
dirty_text = convert_pdf_to_text(file)

pdf_text_path = "pdf_text_output.txt"
with open(pdf_text_path, "w") as f:
f.write(dirty_text)

progress_bar.progress(30)
st.write("Stage: PDF Conversion Complete")

# Display and download PDF text conversion
st.text_area("Converted PDF Text:", dirty_text, height=200)
st.download_button("Download PDF Text", data=dirty_text, file_name=pdf_text_path)

elif metadata["file_type"] == "Audio" and file.type in ["audio/mpeg", "audio/wav"]:
# Stage: Audio Transcription with selected model and speaker labels

# Preserve the original file extension
file_extension = os.path.splitext(file.name)[1]
raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}"

# Stage 1: Upload to Raw
upload_to_azure("raw", raw_file_name, file.read())
st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}")
progress_bar.progress(20)

# Stage 2: Process based on file type
if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]:
# Transcribe audio
with st.spinner(f"Transcribing audio using {metadata['model']} model..."):
dirty_text = transcribe_audio(file, model=metadata["model"], speaker_labels=metadata["speaker_labels"])

transcription_path = "transcription_output.txt"
with open(transcription_path, "w") as f:
f.write(dirty_text)

progress_bar.progress(30)
st.write("Stage: Audio Transcription Complete")

# Display and download transcription
st.text_area("Audio Transcription:", dirty_text, height=200)
st.download_button("Download Transcription", data=dirty_text, file_name=transcription_path)

# Continue processing if dirty_text was successfully created
if dirty_text:
# Stage: Text Cleaning
with st.spinner("Cleaning text with generative AI..."):
partly_clean_text = clean_text(dirty_text)

cleaned_text_path = "cleaned_text_output.txt"
with open(cleaned_text_path, "w") as f:
f.write(partly_clean_text)

progress_bar.progress(60)
st.write("Stage: Text Cleaning Complete")

# Display and download cleaned text
st.text_area("Cleaned Text:", partly_clean_text, height=200)
st.download_button("Download Cleaned Text", data=partly_clean_text, file_name=cleaned_text_path)

# Stage: Chunking and Tokenization
with st.spinner("Chunking and tokenizing text..."):
text_chunks = process_text_chunks(partly_clean_text)
progress_bar.progress(80)
st.write("Stage: Chunking and Tokenization Complete")

# Stage: Embedding and Storage
with st.spinner("Embedding and storing in Weaviate..."):
embed_text(text_chunks, metadata)
progress_bar.progress(100)
st.write("Stage: Embedding and Storage Complete")

st.success("Document processed and embedded with metadata!")
else:
st.error("Failed to process the document.")
transcribed_text = transcribe_audio(
raw_file_name=raw_file_name,
model=metadata["model"],
speaker_labels=metadata["speaker_labels"]
)
if transcribed_text:
dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt"
upload_to_azure("dirty", dirty_file_name, transcribed_text)
st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}")
st.text_area("Transcribed Audio Text:", transcribed_text, height=200)
st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name)
else:
st.error("Failed to transcribe the audio.")

elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf":
# Extract text from PDF
with st.spinner("Extracting text from PDF..."):
extracted_text = convert_pdf_to_text(raw_file_name)
if extracted_text:
dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt"
upload_to_azure("dirty", dirty_file_name, extracted_text)
st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}")
st.text_area("Extracted PDF Text:", extracted_text, height=200)
st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name)
else:
st.error("Failed to extract text from the PDF.")

# Stage 3: Clean Text and Upload to Clean
dirty_content = download_from_azure("dirty", dirty_file_name)
with st.spinner("Cleaning text using generative AI..."):
cleaned_text = clean_text(dirty_file_name) # Updated to handle chunked cleaning
clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt"
upload_to_azure("clean", clean_file_name, cleaned_text)
st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}")

# Display cleaned text
st.text_area("Cleaned Text:", cleaned_text, height=200)
st.download_button("Download Cleaned Text", data=cleaned_text, file_name=clean_file_name)

# Stage 4: Chunk and Embed into Weaviate
with st.spinner("Chunking and embedding text into Weaviate..."):
process_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function
st.success("Document processed and embedded successfully!")
progress_bar.progress(100)

# Navigation buttons (centered)
col1, col2 = st.columns([1, 1])
Expand All @@ -237,50 +240,51 @@ def upload_files_page():
if st.button("View Documents"):
st.session_state.page = "view"

# Define the view_documents_page function
def view_documents_page():
st.title("Uploaded Documents")

# Retrieve Weaviate URL and API Key from environment variables
weaviate_url = os.getenv("WEAVIATE_URL")
weaviate_api_key = os.getenv("WEAVIATE_API_KEY")

if not weaviate_url or not weaviate_api_key:
st.error("Weaviate connection details (URL or API Key) are missing.")
return

# Initialize Weaviate client with API key for authentication
client = weaviate.Client(
url=weaviate_url,
auth_client_secret=weaviate_api_key
)

# Fetch all objects from Weaviate
# Fetch files from the Azure Blob Storage
try:
# Get all objects from the collection (assuming "Documents" is the name of your collection)
result = client.data_object.get(class_name="Documents", properties=["file_name", "file_type", "meeting_date", "meeting_type", "clean_text", "chunks"])

if result['objects']:
for item in result['objects']:
file_name = item['properties'].get('file_name', 'N/A')
file_type = item['properties'].get('file_type', 'N/A')
meeting_date = item['properties'].get('meeting_date', 'N/A')
meeting_type = item['properties'].get('meeting_type', 'N/A')
clean_text = item['properties'].get('clean_text', 'No clean text available')
chunks = item['properties'].get('chunks', 'No chunks available')

# Display the document details in Streamlit
st.subheader(f"Document: {file_name}")
st.write(f"**File Type:** {file_type}")
st.write(f"**Meeting Date:** {meeting_date}")
st.write(f"**Meeting Type:** {meeting_type}")
st.write(f"**Clean Text:** {clean_text[:300]}...") # Show a preview of the clean text
st.write(f"**Chunks:** {chunks[:300]}...") # Show a preview of the chunks
st.write("---")
else:
st.write("No documents found in the Weaviate database.")
except Exception as e:
st.error(f"Error fetching documents from Weaviate: {e}")
# List blobs in the 'raw', 'dirty', and 'clean' folders
raw_blobs = list_blobs_in_folder("raw")
dirty_blobs = list_blobs_in_folder("dirty")
clean_blobs = list_blobs_in_folder("clean")

# Display documents from 'raw' folder
if raw_blobs:
st.subheader("Raw Documents")
for blob in raw_blobs:
st.write(f"- {blob}")
if st.button(f"Download {blob}", key=f"download_raw_{blob}"):
file_content = download_from_azure("raw", blob)
st.download_button("Download", data=file_content, file_name=blob)

# Display documents from 'dirty' folder
if dirty_blobs:
st.subheader("Dirty Documents")
for blob in dirty_blobs:
st.write(f"- {blob}")
if st.button(f"Download {blob}", key=f"download_dirty_{blob}"):
file_content = download_from_azure("dirty", blob)
st.download_button("Download", data=file_content, file_name=blob)

# Display documents from 'clean' folder
if clean_blobs:
st.subheader("Clean Documents")
for blob in clean_blobs:
st.write(f"- {blob}")
if st.button(f"Download {blob}", key=f"download_clean_{blob}"):
file_content = download_from_azure("clean", blob)
st.download_button("Download", data=file_content, file_name=blob)

# If no files are found in any folder
if not raw_blobs and not dirty_blobs and not clean_blobs:
st.write("No documents found in the Azure Blob Storage.")

except Exception as e:
st.error(f"Error fetching documents from Azure Blob Storage: {e}")

# Navigation buttons (centered)
col1, col2 = st.columns([1, 1])
with col1:
Expand All @@ -289,14 +293,7 @@ def view_documents_page():
with col2:
if st.button("Upload Files"):
st.session_state.page = "upload"
# Navigation buttons (centered)
col1, col2 = st.columns([1, 1])
with col1:
if st.button("Return Home"):
st.session_state.page = "home"
with col2:
if st.button("Upload Files"):
st.session_state.page = "upload"


# Main page selection
if "page" not in st.session_state:
Expand Down
8 changes: 4 additions & 4 deletions Preprocessing/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ weaviate-client==4.7.1
openai==1.54.3

# Streamlit for web UI
streamlit==1.16.0
streamlit

# PDF handling (for PDF to text conversion)
PyMuPDF==1.18.19
PyMuPDF

# OpenAI API client (if you're using GPT models for text cleaning)
openai==1.54.3
# azure portal
azure.storage.blob
Loading

0 comments on commit 305cbb2

Please sign in to comment.