Skip to content

Commit

Permalink
Tokenization + Embedder
Browse files Browse the repository at this point in the history
- Added Tokenizer: Tiktoken
- Chunks set to 250
- Embedder: "text-embedding-ada-002"
  • Loading branch information
RileyLePrell committed Nov 19, 2024
1 parent 305cbb2 commit e6bbcfd
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 75 deletions.
4 changes: 2 additions & 2 deletions Preprocessing/App/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text

Check failure on line 20 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:20:1: E402 Module level import not at top of file
from preprocessing_pipeline.audio_transcription import transcribe_audio

Check failure on line 21 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:21:1: E402 Module level import not at top of file
from preprocessing_pipeline.text_cleaning import clean_text

Check failure on line 22 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:22:1: E402 Module level import not at top of file
from preprocessing_pipeline.chunking_vector_embedding import process_and_embed_text
from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text

Check failure on line 23 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:23:1: E402 Module level import not at top of file
from utils.azure_blob_utils import upload_to_azure, download_from_azure

Check failure on line 24 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:24:1: E402 Module level import not at top of file
from utils.azure_blob_utils import list_blobs_in_folder, download_from_azure

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E402)

Preprocessing/App/main.py:25:1: E402 Module level import not at top of file

Check failure on line 25 in Preprocessing/App/main.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F811)

Preprocessing/App/main.py:25:58: F811 Redefinition of unused `download_from_azure` from line 24

Expand Down Expand Up @@ -227,7 +227,7 @@ def upload_files_page():

# Stage 4: Chunk and Embed into Weaviate
with st.spinner("Chunking and embedding text into Weaviate..."):
process_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function
tokenize_and_embed_text(clean_file_name, metadata) # Call the combined chunking and embedding function
st.success("Document processed and embedded successfully!")
progress_bar.progress(100)

Expand Down
6 changes: 5 additions & 1 deletion Preprocessing/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ streamlit
PyMuPDF

# azure portal
azure.storage.blob
azure.storage.blob

transformers

chardet
85 changes: 57 additions & 28 deletions Preprocessing/preprocessing_pipeline/audio_transcription.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,83 @@
import assemblyai as aai
from utils.azure_blob_utils import download_from_azure # For Azure integration
import os
import requests
from utils.azure_blob_utils import download_from_azure
from utils.env_setup import load_env

# Load environment variables
load_env()
ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY")
aai.settings.api_key = ASSEMBLY_AI_KEY
transcriber = aai.Transcriber()
ASSEMBLY_AI_ENDPOINT = "https://api.assemblyai.com/v2"

def transcribe_audio(raw_file_name, model="best", speaker_labels=False):
def transcribe_audio(raw_file_name, model=None, speaker_labels=False):
"""
Transcribes an audio file using AssemblyAI directly from Azure Blob Storage.
Transcribes an audio file using AssemblyAI.
Parameters:
- raw_file_name (str): Name of the raw file in Azure Blob Storage to transcribe.
- model (str): Transcription model to use ("best" or "nano").
- raw_file_name (str): Name of the raw file in Azure Blob Storage.
- model (str): Transcription model to use (not currently implemented in AssemblyAI).
- speaker_labels (bool): Whether to enable speaker diarization.
Returns:
- str: Transcribed text, or None if transcription fails.
"""
headers = {"authorization": ASSEMBLY_AI_KEY}
try:
# Step 1: Download raw audio from Azure Blob Storage
# Step 1: Download the raw audio file from Azure
raw_content = download_from_azure("raw", raw_file_name, as_text=False)
print(f"Downloaded {raw_file_name} from Azure for transcription.")

# Step 2: Map transcription model
if model == "nano":
speech_model = aai.SpeechModel.nano
else:
speech_model = aai.SpeechModel.best

# Step 3: Configure transcription
config = aai.TranscriptionConfig(
speech_model=speech_model,
speaker_labels=speaker_labels
# Step 2: Upload the audio file to AssemblyAI
print("Uploading audio file to AssemblyAI...")
upload_response = requests.post(
f"{ASSEMBLY_AI_ENDPOINT}/upload",
headers=headers,
data=raw_content
)
if upload_response.status_code != 200:
print(f"Error uploading to AssemblyAI: {upload_response.status_code} - {upload_response.text}")
return None

upload_url = upload_response.json()["upload_url"]
print(f"File uploaded to AssemblyAI. URL: {upload_url}")

# Step 4: Start transcription
print("Starting transcription...")
transcript = transcriber.transcribe_audio_bytes(raw_content, config)
# Step 3: Request transcription
print("Requesting transcription from AssemblyAI...")
transcription_payload = {"audio_url": upload_url}

# Step 5: Handle response
if transcript.status == aai.TranscriptStatus.error:
print(f"Transcription error: {transcript.error}")
return None
if speaker_labels:
transcription_payload["speaker_labels"] = True

print("Transcription completed successfully.")
return transcript.text
transcription_response = requests.post(
f"{ASSEMBLY_AI_ENDPOINT}/transcript",
headers=headers,
json=transcription_payload
)
if transcription_response.status_code != 200:
print(f"Error submitting transcription request: {transcription_response.status_code} - {transcription_response.text}")
return None

transcription_id = transcription_response.json()["id"]
print(f"Transcription request submitted. ID: {transcription_id}")

# Step 4: Poll for transcription result
while True:
status_response = requests.get(
f"{ASSEMBLY_AI_ENDPOINT}/transcript/{transcription_id}",
headers=headers
)
status_response.raise_for_status()
data = status_response.json()

if data["status"] == "completed":
print("Transcription completed successfully.")
return data["text"]
elif data["status"] == "failed":
print(f"Transcription failed: {data['error']}")
return None
else:
print("Transcription in progress... Retrying in 5 seconds.")
import time
time.sleep(5)

except Exception as e:
print(f"Error during transcription: {e}")
Expand Down
72 changes: 48 additions & 24 deletions Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,58 @@
import weaviate
import os
from openai import OpenAI
import weaviate
import tiktoken # Use tiktoken for OpenAI-compatible tokenization
from utils.env_setup import load_env
from utils.azure_blob_utils import download_from_azure

# Load environment variables
load_env()
WEAVIATE_URL = os.getenv("WEAVIATE_URL")
WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# Set up the Weaviate client with API key authentication
# Initialize Weaviate client
client = weaviate.Client(
url=WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
)

def process_and_embed_text(clean_file_name, metadata, chunk_size=500):
# Initialize OpenAI client for embedding
openai_client = OpenAI(api_key=OPENAI_API_KEY)

# Initialize tiktoken for OpenAI's embedding model
tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")

def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
"""
Combines chunking and embedding into a single process:
- Splits the cleaned text into smaller chunks.
- Embeds each chunk into Weaviate with metadata.
Tokenizes, chunks, and embeds cleaned text into Weaviate.
Parameters:
- clean_file_name: str, name of the cleaned text file in Azure Blob Storage (clean folder).
- metadata: dict, metadata associated with each chunk (e.g., meeting date, type, etc.).
- chunk_size: int, number of words per chunk.
Args:
clean_file_name (str): Name of the cleaned text file in Azure Blob Storage (clean folder).
metadata (dict): Metadata associated with the file (meeting_date, meeting_type, file_type).
max_chunk_size (int): Maximum token size for each chunk.
"""
try:
# Step 1: Download the cleaned text from Azure and chunk it
print(f"Downloading and chunking the text from {clean_file_name}...")
# Step 1: Download cleaned text from Azure
clean_text = download_from_azure("clean", clean_file_name)
words = clean_text.split()
chunks = [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
print(f"Downloaded cleaned text from Azure for file: {clean_file_name}")

# Extract metadata
# Step 2: Tokenize the text using tiktoken
tokens = tokenizer.encode(clean_text)

# Step 3: Chunk tokens into groups of max_chunk_size (default: 250 tokens per chunk)
chunks = [
tokenizer.decode(tokens[i:i + max_chunk_size])
for i in range(0, len(tokens), max_chunk_size)
]
print(f"Tokenized and split text into {len(chunks)} chunks of {max_chunk_size} tokens each.")

# Extract metadata for embedding
meeting_date = str(metadata["meeting_date"])
meeting_type = metadata["meeting_type"]
file_type = metadata["file_type"]

# Step 2: Check for existing documents in Weaviate with the same metadata and delete them
# Step 4: Check and delete existing embeddings in Weaviate (to prevent duplication)
query = f"""
{{
Get {{
Expand All @@ -54,25 +69,34 @@ def process_and_embed_text(clean_file_name, metadata, chunk_size=500):
response = client.query.raw(query)
existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])

# Step 3: Delete any existing documents with matching metadata in Weaviate
for doc in existing_documents:
client.data_object.delete(doc["id"])
print(f"Deleted {len(existing_documents)} existing documents with matching metadata.")
print(f"Deleted {len(existing_documents)} existing embeddings for this file.")

# Step 5: Embed each chunk using OpenAI and store in Weaviate
for i, chunk in enumerate(chunks):
# Generate embedding using OpenAI
response = openai_client.embeddings.create(
input=chunk,
model="text-embedding-ada-002"
)
embedding = response.data[0].embedding # Correctly access embedding from the response object

# Step 4: Embed and store new chunks in Weaviate
for chunk in chunks:
# Upload chunk to Weaviate
client.data_object.create(
data_object={
"content": chunk,
"meeting_date": meeting_date,
"meeting_type": meeting_type,
"file_type": file_type
"file_type": file_type,
"chunk_index": i # Include chunk index for ordering
},
vector=embedding,
class_name="MeetingDocument"
)
print(f"Embedded chunk for {clean_file_name} in Weaviate.")
print(f"Uploaded chunk {i+1}/{len(chunks)} to Weaviate.")

print(f"Successfully embedded {len(chunks)} chunks for {clean_file_name}.")
print("Successfully processed and embedded all chunks.")

except Exception as e:
print(f"Error during chunking and embedding: {e}")
print(f"Error during tokenization and embedding: {e}")
36 changes: 16 additions & 20 deletions Preprocessing/preprocessing_pipeline/text_cleaning.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import os
from openai import OpenAI
import tiktoken # Use tiktoken for OpenAI-compatible tokenization
from utils.env_setup import load_env
from utils.azure_blob_utils import download_from_azure, upload_to_azure

# Load environment variables
load_env()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def split_text(text, max_chunk_size=3000):
# Initialize tiktoken for OpenAI's GPT models
tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo") # Specify the OpenAI model

def tokenize_and_split_text(text, max_chunk_size=250):
"""
Splits a large text into smaller chunks, each within the specified token size.
Tokenizes and splits text into smaller chunks within the token size limit.
Args:
text (str): The text to split.
Expand All @@ -18,22 +22,14 @@ def split_text(text, max_chunk_size=3000):
Returns:
list of str: List of smaller text chunks.
"""
chunks = []
words = text.split()
chunk = []
current_size = 0

for word in words:
current_size += len(word) + 1 # +1 accounts for spaces
if current_size > max_chunk_size:
chunks.append(" ".join(chunk))
chunk = []
current_size = len(word) + 1
chunk.append(word)

if chunk:
chunks.append(" ".join(chunk))
# Tokenize the text into tokens
tokens = tokenizer.encode(text)

# Split tokens into chunks of max_chunk_size
chunks = [
tokenizer.decode(tokens[i:i + max_chunk_size])
for i in range(0, len(tokens), max_chunk_size)
]
return chunks

def clean_text_chunk(chunk):
Expand All @@ -56,7 +52,7 @@ def clean_text_chunk(chunk):
]

response = client.chat.completions.create(
model="gpt-4",
model="gpt-3.5-turbo",
messages=messages,
max_tokens=2000,
temperature=0.5
Expand All @@ -76,8 +72,8 @@ def clean_text(dirty_file_name):
print(f"Downloading {dirty_file_name} from Azure Blob Storage...")
dirty_content = download_from_azure("dirty", dirty_file_name)

# Split the text into chunks
chunks = split_text(dirty_content, max_chunk_size=3000)
# Tokenize and split the text into chunks of 250 tokens
chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250)
cleaned_chunks = []

for i, chunk in enumerate(chunks):
Expand Down

0 comments on commit e6bbcfd

Please sign in to comment.