diff --git a/Preprocessing/App/.env.example b/Preprocessing/.env.example
similarity index 100%
rename from Preprocessing/App/.env.example
rename to Preprocessing/.env.example
diff --git a/Preprocessing/.streamlit/config.toml b/Preprocessing/.streamlit/config.toml
new file mode 100644
index 00000000..f41f6e6f
--- /dev/null
+++ b/Preprocessing/.streamlit/config.toml
@@ -0,0 +1,2 @@
+[server]
+maxUploadSize = 1000 # Set the upload size limit in MB
diff --git a/Preprocessing/App/main.py b/Preprocessing/App/main.py
new file mode 100644
index 00000000..4d8d5a2e
--- /dev/null
+++ b/Preprocessing/App/main.py
@@ -0,0 +1,287 @@
+# Standard Python imports
+import os
+import sys
+from datetime import datetime
+
+# Load environment variables and set Python path
+from dotenv import load_dotenv
+load_dotenv()
+
+# Set PYTHONPATH from .env if available
+python_path = os.getenv("PYTHONPATH")
+if python_path:
+ sys.path.append(python_path)
+
+# Import dependencies
+import streamlit as st
+import weaviate # Import Weaviate client
+from preprocessing_pipeline.pdf_conversion import convert_pdf_to_text
+from preprocessing_pipeline.audio_transcription import transcribe_audio
+from preprocessing_pipeline.text_cleaning import clean_text
+from preprocessing_pipeline.chunking_vector_embedding import tokenize_and_embed_text
+from utils.azure_blob_utils import (
+ upload_to_azure,
+ download_from_azure,
+ list_blobs_in_folder
+)
+
+# Set up Weaviate client
+client = weaviate.Client(
+ url=os.getenv("WEAVIATE_URL"),
+ auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_API_KEY"))
+)
+
+# Helper function: Generate standardized file names
+def generate_file_name(metadata, stage):
+ meeting_date = metadata["meeting_date"].strftime("%Y_%m_%d")
+ meeting_type = "BOC" if metadata["meeting_type"] == "Board of Commissioners" else "PB"
+ file_type = metadata["file_type"]
+ return f"{meeting_date}_{meeting_type}_{file_type}_{stage}"
+
+# Helper function: Check and overwrite files in local storage
+def save_file_with_overwrite(file_path, content):
+ if os.path.exists(file_path):
+ os.remove(file_path) # Overwrite existing file
+ with open(file_path, "w") as f:
+ f.write(content)
+
+# Helper function: Fetch documents from Weaviate
+def fetch_uploaded_documents():
+ query = """
+ {
+ Get {
+ Documents {
+ file_name
+ file_type
+ meeting_date
+ meeting_type
+ clean_text
+ chunks
+ }
+ }
+ }
+ """
+ response = client.query.raw(query)
+ documents = response.get("data", {}).get("Get", {}).get("Documents", [])
+ return documents
+
+# Home Page
+def home_page():
+ # Custom styling with IBM Plex Mono
+ st.markdown("""
+
+ """, unsafe_allow_html=True)
+
+ st.markdown("""
+
+
Minute Mate
+
+ Welcome to Minute Mate; this is a staff-level application to upload meeting audios, minutes, and agendas to provide further context to the front end.
+
+
+ """, unsafe_allow_html=True)
+
+ # Navigation buttons
+ col1, col2 = st.columns([1, 1])
+ with col1:
+ if st.button("Upload Files", key="upload", help="Upload meeting documents and audio files"):
+ st.session_state.page = "upload"
+ with col2:
+ if st.button("View Documents", key="view", help="View the documents that have been uploaded"):
+ st.session_state.page = "view"
+
+# Upload Files Page
+def upload_files_page():
+ st.title("Upload Municipal Meeting Documents")
+
+ # Sidebar for metadata and options selection
+ st.sidebar.header("Document Metadata & Transcription Options")
+ meeting_date = st.sidebar.date_input("Select Meeting Date", datetime.today())
+ meeting_type = st.sidebar.selectbox("Meeting Type", ["Planning Board", "Board of Commissioners"])
+ file_type = st.sidebar.radio("File Type", ["Agenda", "Minutes", "Audio"])
+ model_option = st.sidebar.selectbox("Select Transcription Model", ["default", "best", "nano"])
+ speaker_labels = st.sidebar.checkbox("Enable Speaker Diarization")
+
+ # Save metadata
+ if st.sidebar.button("Save Metadata"):
+ st.session_state["metadata"] = {
+ "meeting_date": meeting_date,
+ "meeting_type": meeting_type,
+ "file_type": file_type,
+ "model": model_option,
+ "speaker_labels": speaker_labels
+ }
+
+ st.header("Upload New Document")
+ file = st.file_uploader("Choose a file to upload", type=["pdf", "mp3", "wav"])
+
+ # Initialize progress bar
+ progress_bar = st.progress(0)
+
+ if file and "metadata" in st.session_state:
+ metadata = st.session_state["metadata"]
+
+ # Preserve the original file extension
+ file_extension = os.path.splitext(file.name)[1]
+ raw_file_name = f"{generate_file_name(metadata, 'Raw')}{file_extension}"
+
+ # Stage 1: Upload to Raw
+ upload_to_azure("raw", raw_file_name, file.read())
+ st.write(f"Uploaded file to Azure `raw/` folder: {raw_file_name}")
+ progress_bar.progress(20)
+
+ # Stage 2: Process based on file type
+ if metadata["file_type"] == "Audio" and file_extension in [".mp3", ".wav"]:
+ with st.spinner(f"Transcribing audio using {metadata['model']} model..."):
+ transcribed_text = transcribe_audio(
+ raw_file_name=raw_file_name,
+ model=metadata["model"],
+ speaker_labels=metadata["speaker_labels"]
+ )
+ if transcribed_text:
+ dirty_file_name = generate_file_name(metadata, "Transcription") + ".txt"
+ upload_to_azure("dirty", dirty_file_name, transcribed_text)
+ st.write(f"Uploaded transcription to `dirty/` folder: {dirty_file_name}")
+ st.text_area("Transcribed Audio Text:", transcribed_text, height=200)
+ st.download_button("Download Transcribed Text", data=transcribed_text, file_name=dirty_file_name)
+ else:
+ st.error("Failed to transcribe the audio.")
+
+ elif metadata["file_type"] in ["Agenda", "Minutes"] and file_extension == ".pdf":
+ with st.spinner("Extracting text from PDF..."):
+ extracted_text = convert_pdf_to_text(raw_file_name)
+ if extracted_text:
+ dirty_file_name = generate_file_name(metadata, "TextExtraction") + ".txt"
+ upload_to_azure("dirty", dirty_file_name, extracted_text)
+ st.write(f"Uploaded extracted text to `dirty/` folder: {dirty_file_name}")
+ st.text_area("Extracted PDF Text:", extracted_text, height=200)
+ st.download_button("Download Extracted Text", data=extracted_text, file_name=dirty_file_name)
+ else:
+ st.error("Failed to extract text from the PDF.")
+
+ # Stage 3: Clean Text and Upload to Clean
+ dirty_content = download_from_azure("dirty", dirty_file_name)
+ with st.spinner("Cleaning text using generative AI..."):
+ cleaned_text = clean_text(dirty_file_name)
+ clean_file_name = generate_file_name(metadata, "Cleaned") + ".txt"
+ upload_to_azure("clean", clean_file_name, cleaned_text)
+ st.write(f"Uploaded cleaned text to `clean/` folder: {clean_file_name}")
+
+ # Display cleaned text
+ st.text_area("Cleaned Text:", cleaned_text, height=200)
+ st.download_button("Download Cleaned Text", data=cleaned_text, file_name=clean_file_name)
+
+ # Stage 4: Chunk and Embed into Weaviate
+ with st.spinner("Chunking and embedding text into Weaviate..."):
+ tokenize_and_embed_text(clean_file_name, metadata)
+ st.success("Document processed and embedded successfully!")
+ progress_bar.progress(100)
+
+ # Navigation buttons
+ col1, col2 = st.columns([1, 1])
+ with col1:
+ if st.button("Return Home"):
+ st.session_state.page = "home"
+ with col2:
+ if st.button("View Documents"):
+ st.session_state.page = "view"
+
+# View Documents Page
+def view_documents_page():
+ st.title("Uploaded Documents")
+ try:
+ raw_blobs = list_blobs_in_folder("raw")
+ dirty_blobs = list_blobs_in_folder("dirty")
+ clean_blobs = list_blobs_in_folder("clean")
+
+ # Display documents by category
+ if raw_blobs:
+ st.subheader("Raw Documents")
+ for blob in raw_blobs:
+ st.write(f"- {blob}")
+ if st.button(f"Download {blob}", key=f"download_raw_{blob}"):
+ file_content = download_from_azure("raw", blob)
+ st.download_button("Download", data=file_content, file_name=blob)
+
+ if dirty_blobs:
+ st.subheader("Dirty Documents")
+ for blob in dirty_blobs:
+ st.write(f"- {blob}")
+ if st.button(f"Download {blob}", key=f"download_dirty_{blob}"):
+ file_content = download_from_azure("dirty", blob)
+ st.download_button("Download", data=file_content, file_name=blob)
+
+ if clean_blobs:
+ st.subheader("Clean Documents")
+ for blob in clean_blobs:
+ st.write(f"- {blob}")
+ if st.button(f"Download {blob}", key=f"download_clean_{blob}"):
+ file_content = download_from_azure("clean", blob)
+ st.download_button("Download", data=file_content, file_name=blob)
+
+ if not raw_blobs and not dirty_blobs and not clean_blobs:
+ st.write("No documents found in the Azure Blob Storage.")
+ except Exception as e:
+ st.error(f"Error fetching documents from Azure Blob Storage: {e}")
+
+ # Navigation buttons
+ col1, col2 = st.columns([1, 1])
+ with col1:
+ if st.button("Return Home"):
+ st.session_state.page = "home"
+ with col2:
+ if st.button("Upload Files"):
+ st.session_state.page = "upload"
+
+# Main page selection logic
+if "page" not in st.session_state:
+ st.session_state.page = "home"
+
+if st.session_state.page == "home":
+ home_page()
+elif st.session_state.page == "upload":
+ upload_files_page()
+elif st.session_state.page == "view":
+ view_documents_page()
diff --git a/Preprocessing/App/requirements.txt b/Preprocessing/App/requirements.txt
deleted file mode 100644
index 00de1808..00000000
--- a/Preprocessing/App/requirements.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-# Environmental variables
-python-dotenv==1.0.0
-
-# Audio transcription
-assemblyai==0.35.1
-
-# Vector database
-weaviate-client==4.7.1
-
-# Embedding and generation services
-openai==1.54.3
\ No newline at end of file
diff --git a/Preprocessing/Dockerfile b/Preprocessing/Dockerfile
deleted file mode 100644
index 31e6b89b..00000000
--- a/Preprocessing/Dockerfile
+++ /dev/null
@@ -1,6 +0,0 @@
-FROM python:3.11-slim
-WORKDIR /App
-COPY . /App
-RUN python -m pip install -r requirements.txt
-EXPOSE 8000
-CMD [" ", "start","--port","8001","--host","0.0.0.0"]
\ No newline at end of file
diff --git a/Preprocessing/docker/Dockerfile b/Preprocessing/docker/Dockerfile
new file mode 100644
index 00000000..83035bc5
--- /dev/null
+++ b/Preprocessing/docker/Dockerfile
@@ -0,0 +1,20 @@
+# Dockerfile to set up the environment for Streamlit app
+
+# Use Python base image
+FROM python:3.9-slim
+
+# Set working directory
+WORKDIR /app
+
+# Copy requirements file and install dependencies
+COPY docker/requirements.txt .
+RUN pip install -r requirements.txt
+
+# Copy the application files
+COPY . .
+
+# Expose port for Streamlit
+EXPOSE 8501
+
+# Run the Streamlit application
+CMD ["streamlit", "run", "app/main.py", "--server.port=8501", "--server.address=0.0.0.0"]
diff --git a/Preprocessing/docker/requirements.txt b/Preprocessing/docker/requirements.txt
new file mode 100644
index 00000000..757357bb
--- /dev/null
+++ b/Preprocessing/docker/requirements.txt
@@ -0,0 +1,24 @@
+# Environment variables management
+python-dotenv==1.0.0
+
+# Audio transcription
+assemblyai==0.35.1
+
+# Vector database (Weaviate client)
+weaviate-client==4.7.1
+
+# Embedding and generation services
+openai==1.54.3
+
+# Streamlit for web UI
+streamlit
+
+# PDF handling (for PDF to text conversion)
+PyMuPDF
+
+# azure portal
+azure.storage.blob
+
+transformers
+
+chardet
\ No newline at end of file
diff --git a/Preprocessing/preprocessing_pipeline/__init__.py b/Preprocessing/preprocessing_pipeline/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/Preprocessing/preprocessing_pipeline/audio_transcription.py b/Preprocessing/preprocessing_pipeline/audio_transcription.py
new file mode 100644
index 00000000..9d836030
--- /dev/null
+++ b/Preprocessing/preprocessing_pipeline/audio_transcription.py
@@ -0,0 +1,84 @@
+import os
+import requests
+from utils.azure_blob_utils import download_from_azure
+from utils.env_setup import load_env
+
+# Load environment variables
+load_env()
+ASSEMBLY_AI_KEY = os.getenv("ASSEMBLY_AI_KEY")
+ASSEMBLY_AI_ENDPOINT = "https://api.assemblyai.com/v2"
+
+def transcribe_audio(raw_file_name, model=None, speaker_labels=False):
+ """
+ Transcribes an audio file using AssemblyAI.
+
+ Parameters:
+ - raw_file_name (str): Name of the raw file in Azure Blob Storage.
+ - model (str): Transcription model to use (not currently implemented in AssemblyAI).
+ - speaker_labels (bool): Whether to enable speaker diarization.
+
+ Returns:
+ - str: Transcribed text, or None if transcription fails.
+ """
+ headers = {"authorization": ASSEMBLY_AI_KEY}
+ try:
+ # Step 1: Download the raw audio file from Azure
+ raw_content = download_from_azure("raw", raw_file_name, as_text=False)
+ print(f"Downloaded {raw_file_name} from Azure for transcription.")
+
+ # Step 2: Upload the audio file to AssemblyAI
+ print("Uploading audio file to AssemblyAI...")
+ upload_response = requests.post(
+ f"{ASSEMBLY_AI_ENDPOINT}/upload",
+ headers=headers,
+ data=raw_content
+ )
+ if upload_response.status_code != 200:
+ print(f"Error uploading to AssemblyAI: {upload_response.status_code} - {upload_response.text}")
+ return None
+
+ upload_url = upload_response.json()["upload_url"]
+ print(f"File uploaded to AssemblyAI. URL: {upload_url}")
+
+ # Step 3: Request transcription
+ print("Requesting transcription from AssemblyAI...")
+ transcription_payload = {"audio_url": upload_url}
+
+ if speaker_labels:
+ transcription_payload["speaker_labels"] = True
+
+ transcription_response = requests.post(
+ f"{ASSEMBLY_AI_ENDPOINT}/transcript",
+ headers=headers,
+ json=transcription_payload
+ )
+ if transcription_response.status_code != 200:
+ print(f"Error submitting transcription request: {transcription_response.status_code} - {transcription_response.text}")
+ return None
+
+ transcription_id = transcription_response.json()["id"]
+ print(f"Transcription request submitted. ID: {transcription_id}")
+
+ # Step 4: Poll for transcription result
+ while True:
+ status_response = requests.get(
+ f"{ASSEMBLY_AI_ENDPOINT}/transcript/{transcription_id}",
+ headers=headers
+ )
+ status_response.raise_for_status()
+ data = status_response.json()
+
+ if data["status"] == "completed":
+ print("Transcription completed successfully.")
+ return data["text"]
+ elif data["status"] == "failed":
+ print(f"Transcription failed: {data['error']}")
+ return None
+ else:
+ print("Transcription in progress... Retrying in 5 seconds.")
+ import time
+ time.sleep(5)
+
+ except Exception as e:
+ print(f"Error during transcription: {e}")
+ return None
diff --git a/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py
new file mode 100644
index 00000000..6e88de4c
--- /dev/null
+++ b/Preprocessing/preprocessing_pipeline/chunking_vector_embedding.py
@@ -0,0 +1,102 @@
+import os
+from openai import OpenAI
+import weaviate
+import tiktoken # Use tiktoken for OpenAI-compatible tokenization
+from utils.env_setup import load_env
+from utils.azure_blob_utils import download_from_azure
+
+# Load environment variables
+load_env()
+WEAVIATE_URL = os.getenv("WEAVIATE_URL")
+WEAVIATE_API_KEY = os.getenv("WEAVIATE_API_KEY")
+OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
+
+# Initialize Weaviate client
+client = weaviate.Client(
+ url=WEAVIATE_URL,
+ auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY)
+)
+
+# Initialize OpenAI client for embedding
+openai_client = OpenAI(api_key=OPENAI_API_KEY)
+
+# Initialize tiktoken for OpenAI's embedding model
+tokenizer = tiktoken.encoding_for_model("text-embedding-ada-002")
+
+def tokenize_and_embed_text(clean_file_name, metadata, max_chunk_size=250):
+ """
+ Tokenizes, chunks, and embeds cleaned text into Weaviate.
+
+ Args:
+ clean_file_name (str): Name of the cleaned text file in Azure Blob Storage (clean folder).
+ metadata (dict): Metadata associated with the file (meeting_date, meeting_type, file_type).
+ max_chunk_size (int): Maximum token size for each chunk.
+ """
+ try:
+ # Step 1: Download cleaned text from Azure
+ clean_text = download_from_azure("clean", clean_file_name)
+ print(f"Downloaded cleaned text from Azure for file: {clean_file_name}")
+
+ # Step 2: Tokenize the text using tiktoken
+ tokens = tokenizer.encode(clean_text)
+
+ # Step 3: Chunk tokens into groups of max_chunk_size (default: 250 tokens per chunk)
+ chunks = [
+ tokenizer.decode(tokens[i:i + max_chunk_size])
+ for i in range(0, len(tokens), max_chunk_size)
+ ]
+ print(f"Tokenized and split text into {len(chunks)} chunks of {max_chunk_size} tokens each.")
+
+ # Extract metadata for embedding
+ meeting_date = str(metadata["meeting_date"])
+ meeting_type = metadata["meeting_type"]
+ file_type = metadata["file_type"]
+
+ # Step 4: Check and delete existing embeddings in Weaviate (to prevent duplication)
+ query = f"""
+ {{
+ Get {{
+ MeetingDocument(where: {{
+ path: ["meeting_date", "meeting_type", "file_type"],
+ operator: And,
+ valueString: "{meeting_date}"
+ }}) {{
+ id
+ }}
+ }}
+ }}
+ """
+ response = client.query.raw(query)
+ existing_documents = response.get("data", {}).get("Get", {}).get("MeetingDocument", [])
+
+ for doc in existing_documents:
+ client.data_object.delete(doc["id"])
+ print(f"Deleted {len(existing_documents)} existing embeddings for this file.")
+
+ # Step 5: Embed each chunk using OpenAI and store in Weaviate
+ for i, chunk in enumerate(chunks):
+ # Generate embedding using OpenAI
+ response = openai_client.embeddings.create(
+ input=chunk,
+ model="text-embedding-ada-002"
+ )
+ embedding = response.data[0].embedding # Correctly access embedding from the response object
+
+ # Upload chunk to Weaviate
+ client.data_object.create(
+ data_object={
+ "content": chunk,
+ "meeting_date": meeting_date,
+ "meeting_type": meeting_type,
+ "file_type": file_type,
+ "chunk_index": i # Include chunk index for ordering
+ },
+ vector=embedding,
+ class_name="MeetingDocument"
+ )
+ print(f"Uploaded chunk {i+1}/{len(chunks)} to Weaviate.")
+
+ print("Successfully processed and embedded all chunks.")
+
+ except Exception as e:
+ print(f"Error during tokenization and embedding: {e}")
diff --git a/Preprocessing/preprocessing_pipeline/pdf_conversion.py b/Preprocessing/preprocessing_pipeline/pdf_conversion.py
new file mode 100644
index 00000000..0e23c92a
--- /dev/null
+++ b/Preprocessing/preprocessing_pipeline/pdf_conversion.py
@@ -0,0 +1,31 @@
+import fitz # PyMuPDF
+from utils.azure_blob_utils import download_from_azure
+
+def convert_pdf_to_text(raw_file_name):
+ """
+ Extracts text from a PDF file.
+
+ Args:
+ raw_file_name (str): Name of the PDF file in Azure Blob Storage (raw folder).
+
+ Returns:
+ str: Extracted text from the PDF.
+ """
+ try:
+ # Step 1: Download the raw file from Azure Blob Storage
+ raw_content = download_from_azure("raw", raw_file_name, as_text=False)
+
+ # Step 2: Open the PDF content and extract text
+ text = ""
+ pdf_document = fitz.open(stream=raw_content, filetype="pdf")
+ for page_num in range(pdf_document.page_count):
+ page = pdf_document[page_num]
+ text += page.get_text()
+ pdf_document.close()
+
+ print(f"Successfully extracted text from {raw_file_name}.")
+ return text
+
+ except Exception as e:
+ print(f"Error extracting text from PDF {raw_file_name}: {e}")
+ return None
diff --git a/Preprocessing/preprocessing_pipeline/text_cleaning.py b/Preprocessing/preprocessing_pipeline/text_cleaning.py
new file mode 100644
index 00000000..a9912220
--- /dev/null
+++ b/Preprocessing/preprocessing_pipeline/text_cleaning.py
@@ -0,0 +1,84 @@
+import os
+from openai import OpenAI
+import tiktoken # Use tiktoken for OpenAI-compatible tokenization
+from utils.env_setup import load_env
+from utils.azure_blob_utils import download_from_azure, upload_to_azure
+
+# Load environment variables
+load_env()
+client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
+
+# Initialize tiktoken for OpenAI's GPT models
+tokenizer = tiktoken.encoding_for_model("gpt-3.5-turbo") # Specify the OpenAI model
+
+def tokenize_and_split_text(text, max_chunk_size=250):
+ """
+ Tokenizes and splits text into smaller chunks within the token size limit.
+
+ Args:
+ text (str): The text to split.
+ max_chunk_size (int): Maximum token size for each chunk.
+
+ Returns:
+ list of str: List of smaller text chunks.
+ """
+ # Tokenize the text into tokens
+ tokens = tokenizer.encode(text)
+
+ # Split tokens into chunks of max_chunk_size
+ chunks = [
+ tokenizer.decode(tokens[i:i + max_chunk_size])
+ for i in range(0, len(tokens), max_chunk_size)
+ ]
+ return chunks
+
+def clean_text_chunk(chunk):
+ """
+ Cleans a single chunk of text using OpenAI GPT.
+
+ Args:
+ chunk (str): Text chunk to clean.
+
+ Returns:
+ str: Cleaned text.
+ """
+ context_prompt = (
+ "The following text is a transcription of a municipal meeting for the town of Cramerton. "
+ "Please clean it for readability and correct any errors or inconsistencies."
+ )
+ messages = [
+ {"role": "system", "content": context_prompt},
+ {"role": "user", "content": f"Clean the following text for readability: {chunk}"}
+ ]
+
+ response = client.chat.completions.create(
+ model="gpt-3.5-turbo",
+ messages=messages,
+ max_tokens=2000,
+ temperature=0.5
+ )
+ return response.choices[0].message.content.strip()
+
+def clean_text(dirty_file_name):
+ """
+ Cleans the given text file by splitting it into smaller chunks and processing each chunk.
+
+ Args:
+ dirty_file_name (str): Name of the file in Azure Blob Storage (dirty folder).
+
+ Returns:
+ str: Combined cleaned text.
+ """
+ print(f"Downloading {dirty_file_name} from Azure Blob Storage...")
+ dirty_content = download_from_azure("dirty", dirty_file_name)
+
+ # Tokenize and split the text into chunks of 250 tokens
+ chunks = tokenize_and_split_text(dirty_content, max_chunk_size=250)
+ cleaned_chunks = []
+
+ for i, chunk in enumerate(chunks):
+ print(f"Cleaning chunk {i + 1}/{len(chunks)}...")
+ cleaned_chunk = clean_text_chunk(chunk)
+ cleaned_chunks.append(cleaned_chunk)
+
+ return "\n\n".join(cleaned_chunks)
diff --git a/Preprocessing/utils/azure_blob_utils.py b/Preprocessing/utils/azure_blob_utils.py
new file mode 100644
index 00000000..34dd3569
--- /dev/null
+++ b/Preprocessing/utils/azure_blob_utils.py
@@ -0,0 +1,64 @@
+from azure.storage.blob import BlobServiceClient
+import os
+from dotenv import load_dotenv
+import chardet
+load_dotenv() # Load environment variables from .env file
+
+# Set up the blob service client
+connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
+container_name = os.getenv("AZURE_STORAGE_CONTAINER")
+blob_service_client = BlobServiceClient.from_connection_string(connection_string)
+container_client = blob_service_client.get_container_client(container_name)
+
+def upload_to_azure(folder_name, file_name, file_content):
+ """
+ Upload a file to Azure Blob Storage.
+
+ Args:
+ folder_name (str): The folder in the Azure container (e.g., raw, dirty, clean).
+ file_name (str): The name of the file to upload.
+ file_content (bytes): The binary content of the file to upload.
+ """
+ blob_name = f"{folder_name}/{file_name}"
+ blob_client = container_client.get_blob_client(blob_name)
+ blob_client.upload_blob(file_content, overwrite=True)
+ print(f"Uploaded to Azure: {blob_name}")
+
+def download_from_azure(folder_name, file_name, as_text=True):
+ """
+ Download a file from Azure Blob Storage with streaming.
+ """
+ blob_name = f"{folder_name}/{file_name}"
+ blob_client = container_client.get_blob_client(blob_name)
+
+ # Print the URL for debugging
+ print(f"Generated Blob URL: {blob_client.url}")
+
+ try:
+ downloader = blob_client.download_blob(max_concurrency=5)
+ if as_text:
+ # Read as binary first and detect encoding
+ raw_data = downloader.readall()
+ detected_encoding = chardet.detect(raw_data)['encoding']
+ print(f"Detected encoding: {detected_encoding}")
+ return raw_data.decode(detected_encoding) # Decode using detected encoding
+ else:
+ print(f"Downloading {blob_name} as binary.")
+ return downloader.readall() # Return binary content
+ except Exception as e:
+ print(f"Error downloading blob {blob_name}: {e}")
+ raise e
+
+
+def list_blobs_in_folder(folder_name):
+ """
+ List all blobs in a specific folder in Azure Blob Storage.
+
+ Args:
+ folder_name (str): The folder to list blobs from.
+
+ Returns:
+ list: List of blob names.
+ """
+ blobs = container_client.list_blobs(name_starts_with=f"{folder_name}/")
+ return [blob.name for blob in blobs]
diff --git a/Preprocessing/utils/env_setup.py b/Preprocessing/utils/env_setup.py
new file mode 100644
index 00000000..acc1b513
--- /dev/null
+++ b/Preprocessing/utils/env_setup.py
@@ -0,0 +1,12 @@
+import os
+import sys
+from dotenv import load_dotenv
+
+def load_env():
+ """
+ Loads environment variables from a .env file and adds PYTHONPATH.
+ """
+ load_dotenv()
+ python_path = os.getenv("PYTHONPATH")
+ if python_path:
+ sys.path.append(python_path)
diff --git a/Preprocessing/utils/file_utils.py b/Preprocessing/utils/file_utils.py
new file mode 100644
index 00000000..e69de29b
diff --git a/Preprocessing/utils/metadata_utils.py b/Preprocessing/utils/metadata_utils.py
new file mode 100644
index 00000000..e69de29b