Skip to content

Commit

Permalink
Merge pull request #60 from dsba6010-llm-applications/preprocess_updates
Browse files Browse the repository at this point in the history
Updated File Fix + Updated Env + Add a Few Tests
  • Loading branch information
neal-logan authored Dec 10, 2024
2 parents db36126 + d4198c5 commit 320edc9
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 57 deletions.
1 change: 1 addition & 0 deletions Preprocessing/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ASSEMBLY_AI_KEY =

#Azure Storage Container Connection
AZURE_STORAGE_CONNECTION_STRING=
AZURE_STORAGE_CONTAINER =

# Pathing Setup
PYTHONPATH=
41 changes: 22 additions & 19 deletions Preprocessing/App/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text

Check failure on line 18 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:18:1: E402 Module level import not at top of file
from preprocessing_pipeline.audio_transcription import transcribe_audio

Check failure on line 19 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:19:1: E402 Module level import not at top of file
from preprocessing_pipeline.text_cleaning import clean_text

Check failure on line 20 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:20:1: E402 Module level import not at top of file
from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text
from preprocessing_pipeline.chunking_vector_embedding import (
tokenize_and_embed_text,
fetch_matching_chunks,
delete_matching_chunks
)

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:21:1: E402 Module level import not at top of file
from utils.azure_blob_utils import (
upload_to_azure,
download_from_azure,
Expand Down Expand Up @@ -209,11 +213,21 @@ def upload_files_page():
upload_to_azure("clean", clean_file_name, cleaned_text)
st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}")

# Display cleaned text
st.text_area("Cleaned Text:", cleaned_text, height=200)
st.download_button("Download Cleaned Text", data=cleaned_text, file_name=clean_file_name)
# Stage 4: Check and Delete Existing Embeddings
with st.spinner("Checking for existing embeddings..."):
matching_chunks = fetch_matching_chunks(
str(metadata["meeting_date"]),
metadata["meeting_type"],
metadata["file_type"],
clean_file_name
)
if matching_chunks:
st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...")
delete_matching_chunks(matching_chunks)
else:
st.write("No existing chunks found.")

# Stage 4: Chunk and Embed into Weaviate
# Stage 5: Chunk and Embed into Weaviate
with st.spinner("Chunking and embedding text into Weaviate..."):
tokenize_and_embed_text(clean_file_name, metadata)
st.success("Document processed and embedded successfully!")
Expand Down Expand Up @@ -242,33 +256,23 @@ def group_blobs_by_date(blobs):
grouped = {}
for blob in blobs:
try:
# Extract the file name without folder prefix (e.g., "raw/")
file_name = blob.split("/")[-1] # Get only the file name part

# Extract the date from the file name (assuming format: YYYY_MM_DD)
parts = file_name.split("_") # Split into ['2023', '12', '12', 'BOC', 'Agenda', ...]
file_name = blob.split("/")[-1] # Extract the file name
parts = file_name.split("_") # Split into parts: ['2023', '12', '12', 'BOC', 'Agenda', ...]
date_str = "_".join(parts[:3]) # Join the first three parts: '2023_12_12'

# Convert the date string to a readable format
readable_date = datetime.strptime(date_str, "%Y_%m_%d").strftime("%B %d, %Y")

# Group by the readable date
if readable_date not in grouped:
grouped[readable_date] = []
grouped[readable_date].append(blob)
except (ValueError, IndexError):
# Handle files with unexpected formats
if "Unknown Date" not in grouped:
grouped["Unknown Date"] = []
grouped["Unknown Date"].append(blob)
return grouped

# Group blobs by date
raw_grouped = group_blobs_by_date(raw_blobs)
dirty_grouped = group_blobs_by_date(dirty_blobs)
clean_grouped = group_blobs_by_date(clean_blobs)

# Function to display blobs within a group
def display_grouped_blobs(grouped_blobs, category):
if grouped_blobs:
st.subheader(f"{category.capitalize()} Documents")
Expand All @@ -282,7 +286,6 @@ def display_grouped_blobs(grouped_blobs, category):
else:
st.info(f"No documents found in the {category} category.")

# Display grouped blobs
display_grouped_blobs(raw_grouped, "raw")
display_grouped_blobs(dirty_grouped, "dirty")
display_grouped_blobs(clean_grouped, "clean")
Expand All @@ -308,4 +311,4 @@ def display_grouped_blobs(grouped_blobs, category):
elif st.session_state.page == "upload":
upload_files_page()
elif st.session_state.page == "view":
view_documents_page()
view_documents_page()
104 changes: 66 additions & 38 deletions Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,56 @@
# Initialize tiktoken for OpenAI's embedding model
tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")


def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document):
"""
Fetch matching chunks from Weaviate based on metadata.
Args:
meeting_date (str): Date of the meeting.
meeting_type (str): Type of the meeting (e.g., "Board of Commissioners").
file_type (str): File type (e.g., "Minutes").
source_document (str): Name of the source document.
Returns:
list: A list of matching documents.
"""
query = f"""
{{
Get {{
MeetingDocument(where: {{
operator: And,
operands: [
{{ path: ["meeting_date"], operator: Equal, valueString: "{meeting_date}" }},
{{ path: ["meeting_type"], operator: Equal, valueString: "{meeting_type}" }},
{{ path: ["file_type"], operator: Equal, valueString: "{file_type}" }},
{{ path: ["source_document"], operator: Equal, valueString: "{source_document}" }}
]
}}) {{
_additional {{
id
}}
}}
}}
}}
"""
response = client.query.raw(query)
return response.get("data", {}).get("Get", {}).get("MeetingDocument", [])


def delete_matching_chunks(documents):
"""
Delete matching chunks from Weaviate.
Args:
documents (list): List of documents with IDs to delete.
"""
for doc in documents:
doc_id = doc["_additional"]["id"]
client.data_object.delete(doc_id)
print(f"Deleted chunk ID: {doc_id}")


def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
"""
Tokenizes, chunks, and embeds cleaned text into Weaviate.
Expand All @@ -33,68 +83,46 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
max_chunk_size (int): Maximum token size for each chunk.
"""
try:
# Step 1: Download cleaned text from Azure
# Download cleaned text from Azure
clean_text = download_from_azure("clean", clean_file_name)
print(f"Downloaded cleaned text from Azure for file: {clean_file_name}")

# Step 2: Tokenize the text using tiktoken
tokens = tokenizer.encode(clean_text)

# Step 3: Chunk tokens into groups of max_chunk_size (default: 250 tokens per chunk)
chunks = [
tokenizer.decode(tokens[i:i + max_chunk_size])
for i in range(0, len(tokens), max_chunk_size)
]
print(f"Tokenized and split text into {len(chunks)} chunks of {max_chunk_size} tokens each.")

# Extract metadata for embedding
# Metadata fields
meeting_date = str(metadata["meeting_date"])
meeting_type = metadata["meeting_type"]
file_type = metadata["file_type"]
source_document = clean_file_name

# Step 4: Check and delete existing embeddings in Weaviate (to prevent duplication)
query = f"""
{{
Get {{
MeetingDocument(where: {{
path: ["meeting_date", "meeting_type", "file_type"],
operator: And,
valueString: "{meeting_date}"
}}) {{
id
}}
}}
}}
"""
response = client.query.raw(query)
existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])

for doc in existing_documents:
client.data_object.delete(doc["id"])
print(f"Deleted {len(existing_documents)} existing embeddings for this file.")
# Check for existing embeddings
matching_chunks = fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document)
if matching_chunks:
print(f"Found {len(matching_chunks)} existing chunks. Deleting...")
delete_matching_chunks(matching_chunks)
else:
print("No existing chunks found.")

# Step 5: Embed each chunk using OpenAI and store in Weaviate
# Embed and upload each chunk
for i, chunk in enumerate(chunks):
# Generate embedding using OpenAI
response = openai_client.embeddings.create(
input=chunk,
model="text-embedding-ada-002"
)
embedding = response.data[0].embedding # Correctly access embedding from the response object
response = openai_client.embeddings.create(input=chunk, model="text-embedding-ada-002")
embedding = response.data[0].embedding

# Upload chunk to Weaviate
client.data_object.create(
data_object={
"content": chunk,
"meeting_date": meeting_date,
"meeting_type": meeting_type,
"file_type": file_type,
"chunk_index": i # Include chunk index for ordering
"chunk_index": i,
"source_document": source_document
},
vector=embedding,
class_name="MeetingDocument"
)
print(f"Uploaded chunk {i+1}/{len(chunks)} to Weaviate.")
print(f"Uploaded chunk {i + 1}/{len(chunks)} to Weaviate.")

print("Successfully processed and embedded all chunks.")

Expand Down
Binary file not shown.
87 changes: 87 additions & 0 deletions Preprocessing/tests/metadata_by_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# This allows you to find all the chunks by a specific meeting date.

import os
import weaviate
from dotenv import load_dotenv
from docx import Document

# Load environment variables from .env
load_dotenv()

# Initialize Weaviate client
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
client = weaviate.Client(
url=WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
)

def fetch_documents_by_date_and_export_to_word(date):
"""
Fetch documents from Weaviate filtered by a specific date and export metadata, including source_document, to a Word document.
Args:
date (str): The date to filter by (YYYY-MM-DD format).
"""
query = f"""
{{
Get {{
MeetingDocument(where: {{
path: ["meeting_date"],
operator: Equal,
valueString: "{date}"
}}) {{
content
meeting_date
meeting_type
file_type
chunk_index
source_document
}}
}}
}}
"""
try:
print(f"Querying Weaviate for documents on {date}...")
response = client.query.raw(query)
documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])

if not documents:
print(f"No documents found for the date: {date}.")
return

print(f"\nRetrieved Documents for {date}:")
for doc in documents:
print(f"- Chunk Index: {doc.get('chunk_index', 'N/A')}")
print(f" Meeting Date: {doc.get('meeting_date', 'N/A')}")
print(f" Meeting Type: {doc.get('meeting_type', 'N/A')}")
print(f" File Type: {doc.get('file_type', 'N/A')}")
print(f" Source Document: {doc.get('source_document', 'N/A')}")
print(f" Content Preview: {doc.get('content', 'N/A')[:100]}...")
print()

# Export metadata to Word
print(f"Exporting metadata for {date} to Word document...")
doc = Document()
doc.add_heading(f'Document Metadata for {date}', level=1)

for doc_data in documents:
doc.add_heading(f"Chunk Index: {doc_data.get('chunk_index', 'N/A')}", level=2)
doc.add_paragraph(f"Meeting Date: {doc_data.get('meeting_date', 'N/A')}")
doc.add_paragraph(f"Meeting Type: {doc_data.get('meeting_type', 'N/A')}")
doc.add_paragraph(f"File Type: {doc_data.get('file_type', 'N/A')}")
doc.add_paragraph(f"Source Document: {doc_data.get('source_document', 'N/A')}")
doc.add_paragraph(f"Content Preview: {doc_data.get('content', 'N/A')}")
doc.add_paragraph("\n")

word_file_path = f"Weaviate_Metadata_List_{date}.docx"
doc.save(word_file_path)
print(f"Metadata exported to {word_file_path} successfully.")

except Exception as e:
print(f"Error querying Weaviate: {e}")

if __name__ == "__main__":
# Filter by specific date (YYYY-MM-DD format)
specific_date = "2000-10-27"
fetch_documents_by_date_and_export_to_word(specific_date)
Loading

0 comments on commit 320edc9

Please sign in to comment.