Skip to content

Commit

Permalink
Updated File Fix + Updated Env + Add a Few Tests
Browse files Browse the repository at this point in the history
@wesslen brought up a great point about what happens if you want to update a document. Previously, if I uploaded an updated version of a document, the previous chunks would still exist alongside the new ones. To address this, I’ve updated the logic to identify when someone uploads the same document based on matching criteria (date, file type, and meeting type). The system now checks for matching chunks, lists how many exist, deletes them, and uploads the new ones.

I’ve also updated the .env_example file to include AZURE_STORAGE_CONTAINER, which people need to configure. I forgot to include that earlier—oops!

Additionally, I’ve added a few tests related to this functionality:

metadata_deletion_test.py: Allows someone to test deleting chunks based on a specific date.
metadata_by_date.py: Lets users retrieve all chunks associated with a specific date.
  • Loading branch information
RileyLePrell committed Dec 6, 2024
1 parent 74562ea commit d4198c5
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 57 deletions.
1 change: 1 addition & 0 deletions Preprocessing/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ASSEMBLY_AI_KEY =

#Azure Storage Container Connection
AZURE_STORAGE_CONNECTION_STRING=
AZURE_STORAGE_CONTAINER =

# Pathing Setup
PYTHONPATH=
41 changes: 22 additions & 19 deletions Preprocessing/App/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text

Check failure on line 18 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:18:1: E402 Module level import not at top of file

Check failure on line 18 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:18:1: E402 Module level import not at top of file
from preprocessing_pipeline.audio_transcription import transcribe_audio

Check failure on line 19 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:19:1: E402 Module level import not at top of file

Check failure on line 19 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:19:1: E402 Module level import not at top of file
from preprocessing_pipeline.text_cleaning import clean_text

Check failure on line 20 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:20:1: E402 Module level import not at top of file

Check failure on line 20 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:20:1: E402 Module level import not at top of file
from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text
from preprocessing_pipeline.chunking_vector_embedding import (
tokenize_and_embed_text,
fetch_matching_chunks,
delete_matching_chunks
)

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:21:1: E402 Module level import not at top of file

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:21:1: E402 Module level import not at top of file
from utils.azure_blob_utils import (
upload_to_azure,
download_from_azure,
Expand Down Expand Up @@ -209,11 +213,21 @@ def upload_files_page():
upload_to_azure("clean", clean_file_name, cleaned_text)
st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}")

# Display cleaned text
st.text_area("Cleaned Text:", cleaned_text, height=200)
st.download_button("Download Cleaned Text", data=cleaned_text, file_name=clean_file_name)
# Stage 4: Check and Delete Existing Embeddings
with st.spinner("Checking for existing embeddings..."):
matching_chunks = fetch_matching_chunks(
str(metadata["meeting_date"]),
metadata["meeting_type"],
metadata["file_type"],
clean_file_name
)
if matching_chunks:
st.write(f"Found {len(matching_chunks)} existing chunks. Deleting...")
delete_matching_chunks(matching_chunks)
else:
st.write("No existing chunks found.")

# Stage 4: Chunk and Embed into Weaviate
# Stage 5: Chunk and Embed into Weaviate
with st.spinner("Chunking and embedding text into Weaviate..."):
tokenize_and_embed_text(clean_file_name, metadata)
st.success("Document processed and embedded successfully!")
Expand Down Expand Up @@ -242,33 +256,23 @@ def group_blobs_by_date(blobs):
grouped = {}
for blob in blobs:
try:
# Extract the file name without folder prefix (e.g., "raw/")
file_name = blob.split("/")[-1] # Get only the file name part

# Extract the date from the file name (assuming format: YYYY_MM_DD)
parts = file_name.split("_") # Split into ['2023', '12', '12', 'BOC', 'Agenda', ...]
file_name = blob.split("/")[-1] # Extract the file name
parts = file_name.split("_") # Split into parts: ['2023', '12', '12', 'BOC', 'Agenda', ...]
date_str = "_".join(parts[:3]) # Join the first three parts: '2023_12_12'

# Convert the date string to a readable format
readable_date = datetime.strptime(date_str, "%Y_%m_%d").strftime("%B %d, %Y")

# Group by the readable date
if readable_date not in grouped:
grouped[readable_date] = []
grouped[readable_date].append(blob)
except (ValueError, IndexError):
# Handle files with unexpected formats
if "Unknown Date" not in grouped:
grouped["Unknown Date"] = []
grouped["Unknown Date"].append(blob)
return grouped

# Group blobs by date
raw_grouped = group_blobs_by_date(raw_blobs)
dirty_grouped = group_blobs_by_date(dirty_blobs)
clean_grouped = group_blobs_by_date(clean_blobs)

# Function to display blobs within a group
def display_grouped_blobs(grouped_blobs, category):
if grouped_blobs:
st.subheader(f"{category.capitalize()} Documents")
Expand All @@ -282,7 +286,6 @@ def display_grouped_blobs(grouped_blobs, category):
else:
st.info(f"No documents found in the {category} category.")

# Display grouped blobs
display_grouped_blobs(raw_grouped, "raw")
display_grouped_blobs(dirty_grouped, "dirty")
display_grouped_blobs(clean_grouped, "clean")
Expand All @@ -308,4 +311,4 @@ def display_grouped_blobs(grouped_blobs, category):
elif st.session_state.page == "upload":
upload_files_page()
elif st.session_state.page == "view":
view_documents_page()
view_documents_page()
104 changes: 66 additions & 38 deletions Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,56 @@
# Initialize tiktoken for OpenAI's embedding model
tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")


def fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document):
"""
Fetch matching chunks from Weaviate based on metadata.
Args:
meeting_date (str): Date of the meeting.
meeting_type (str): Type of the meeting (e.g., "Board of Commissioners").
file_type (str): File type (e.g., "Minutes").
source_document (str): Name of the source document.
Returns:
list: A list of matching documents.
"""
query = f"""
{{
Get {{
MeetingDocument(where: {{
operator: And,
operands: [
{{ path: ["meeting_date"], operator: Equal, valueString: "{meeting_date}" }},
{{ path: ["meeting_type"], operator: Equal, valueString: "{meeting_type}" }},
{{ path: ["file_type"], operator: Equal, valueString: "{file_type}" }},
{{ path: ["source_document"], operator: Equal, valueString: "{source_document}" }}
]
}}) {{
_additional {{
id
}}
}}
}}
}}
"""
response = client.query.raw(query)
return response.get("data", {}).get("Get", {}).get("MeetingDocument", [])


def delete_matching_chunks(documents):
"""
Delete matching chunks from Weaviate.
Args:
documents (list): List of documents with IDs to delete.
"""
for doc in documents:
doc_id = doc["_additional"]["id"]
client.data_object.delete(doc_id)
print(f"Deleted chunk ID: {doc_id}")


def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
"""
Tokenizes, chunks, and embeds cleaned text into Weaviate.
Expand All @@ -33,68 +83,46 @@ def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
max_chunk_size (int): Maximum token size for each chunk.
"""
try:
# Step 1: Download cleaned text from Azure
# Download cleaned text from Azure
clean_text = download_from_azure("clean", clean_file_name)
print(f"Downloaded cleaned text from Azure for file: {clean_file_name}")

# Step 2: Tokenize the text using tiktoken
tokens = tokenizer.encode(clean_text)

# Step 3: Chunk tokens into groups of max_chunk_size (default: 250 tokens per chunk)
chunks = [
tokenizer.decode(tokens[i:i + max_chunk_size])
for i in range(0, len(tokens), max_chunk_size)
]
print(f"Tokenized and split text into {len(chunks)} chunks of {max_chunk_size} tokens each.")

# Extract metadata for embedding
# Metadata fields
meeting_date = str(metadata["meeting_date"])
meeting_type = metadata["meeting_type"]
file_type = metadata["file_type"]
source_document = clean_file_name

# Step 4: Check and delete existing embeddings in Weaviate (to prevent duplication)
query = f"""
{{
Get {{
MeetingDocument(where: {{
path: ["meeting_date", "meeting_type", "file_type"],
operator: And,
valueString: "{meeting_date}"
}}) {{
id
}}
}}
}}
"""
response = client.query.raw(query)
existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])

for doc in existing_documents:
client.data_object.delete(doc["id"])
print(f"Deleted {len(existing_documents)} existing embeddings for this file.")
# Check for existing embeddings
matching_chunks = fetch_matching_chunks(meeting_date, meeting_type, file_type, source_document)
if matching_chunks:
print(f"Found {len(matching_chunks)} existing chunks. Deleting...")
delete_matching_chunks(matching_chunks)
else:
print("No existing chunks found.")

# Step 5: Embed each chunk using OpenAI and store in Weaviate
# Embed and upload each chunk
for i, chunk in enumerate(chunks):
# Generate embedding using OpenAI
response = openai_client.embeddings.create(
input=chunk,
model="text-embedding-ada-002"
)
embedding = response.data[0].embedding # Correctly access embedding from the response object
response = openai_client.embeddings.create(input=chunk, model="text-embedding-ada-002")
embedding = response.data[0].embedding

# Upload chunk to Weaviate
client.data_object.create(
data_object={
"content": chunk,
"meeting_date": meeting_date,
"meeting_type": meeting_type,
"file_type": file_type,
"chunk_index": i # Include chunk index for ordering
"chunk_index": i,
"source_document": source_document
},
vector=embedding,
class_name="MeetingDocument"
)
print(f"Uploaded chunk {i+1}/{len(chunks)} to Weaviate.")
print(f"Uploaded chunk {i + 1}/{len(chunks)} to Weaviate.")

print("Successfully processed and embedded all chunks.")

Expand Down
Binary file not shown.
87 changes: 87 additions & 0 deletions Preprocessing/tests/metadata_by_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# This allows you to find all the chunks by a specific meeting date.

import os
import weaviate
from dotenv import load_dotenv
from docx import Document

# Load environment variables from .env
load_dotenv()

# Initialize Weaviate client
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
client = weaviate.Client(
url=WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
)

def fetch_documents_by_date_and_export_to_word(date):
"""
Fetch documents from Weaviate filtered by a specific date and export metadata, including source_document, to a Word document.
Args:
date (str): The date to filter by (YYYY-MM-DD format).
"""
query = f"""
{{
Get {{
MeetingDocument(where: {{
path: ["meeting_date"],
operator: Equal,
valueString: "{date}"
}}) {{
content
meeting_date
meeting_type
file_type
chunk_index
source_document
}}
}}
}}
"""
try:
print(f"Querying Weaviate for documents on {date}...")
response = client.query.raw(query)
documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])

if not documents:
print(f"No documents found for the date: {date}.")
return

print(f"\nRetrieved Documents for {date}:")
for doc in documents:
print(f"- Chunk Index: {doc.get('chunk_index', 'N/A')}")
print(f" Meeting Date: {doc.get('meeting_date', 'N/A')}")
print(f" Meeting Type: {doc.get('meeting_type', 'N/A')}")
print(f" File Type: {doc.get('file_type', 'N/A')}")
print(f" Source Document: {doc.get('source_document', 'N/A')}")
print(f" Content Preview: {doc.get('content', 'N/A')[:100]}...")
print()

# Export metadata to Word
print(f"Exporting metadata for {date} to Word document...")
doc = Document()
doc.add_heading(f'Document Metadata for {date}', level=1)

for doc_data in documents:
doc.add_heading(f"Chunk Index: {doc_data.get('chunk_index', 'N/A')}", level=2)
doc.add_paragraph(f"Meeting Date: {doc_data.get('meeting_date', 'N/A')}")
doc.add_paragraph(f"Meeting Type: {doc_data.get('meeting_type', 'N/A')}")
doc.add_paragraph(f"File Type: {doc_data.get('file_type', 'N/A')}")
doc.add_paragraph(f"Source Document: {doc_data.get('source_document', 'N/A')}")
doc.add_paragraph(f"Content Preview: {doc_data.get('content', 'N/A')}")
doc.add_paragraph("\n")

word_file_path = f"Weaviate_Metadata_List_{date}.docx"
doc.save(word_file_path)
print(f"Metadata exported to {word_file_path} successfully.")

except Exception as e:
print(f"Error querying Weaviate: {e}")

if __name__ == "__main__":
# Filter by specific date (YYYY-MM-DD format)
specific_date = "2000-10-27"
fetch_documents_by_date_and_export_to_word(specific_date)
Loading

0 comments on commit d4198c5

Please sign in to comment.