Skip to content

Commit

Permalink
feat: add azure blob filesystem + resourcesreader (run-llama#14379)
Browse files Browse the repository at this point in the history
* feat: add azure blob filesystem + resourcesreader

* prettier

* wip

* bump
  • Loading branch information
EmanuelCampos authored Jun 26, 2024
1 parent f89abb3 commit a26682e
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Azure Storage Blob file and directory reader.
"""
Azure Storage Blob file and directory reader.
A loader that fetches a file or iterates through a directory from Azure Storage Blob.
"""
import logging
import math
import os
from pathlib import Path
import tempfile
import time
from typing import Any, Dict, List, Optional, Union
Expand All @@ -16,12 +18,21 @@
from llama_index.core.readers import SimpleDirectoryReader
from llama_index.core.readers.base import BaseReader, BasePydanticReader
from llama_index.core.schema import Document
from llama_index.core.readers import SimpleDirectoryReader, FileSystemReaderMixin
from llama_index.core.readers.base import (
BaseReader,
BasePydanticReader,
ResourcesReaderMixin,
)

logger = logging.getLogger(__name__)


class AzStorageBlobReader(BasePydanticReader):
"""General reader for any Azure Storage Blob file or directory.
class AzStorageBlobReader(
BasePydanticReader, ResourcesReaderMixin, FileSystemReaderMixin
):
"""
General reader for any Azure Storage Blob file or directory.
Args:
container_name (str): name of the container for the blob.
Expand Down Expand Up @@ -64,102 +75,156 @@ class AzStorageBlobReader(BasePydanticReader):
def class_name(cls) -> str:
return "AzStorageBlobReader"

def load_data(self) -> List[Document]:
"""Load file(s) from Azure Storage Blob."""
def _get_container_client(self):
if self.connection_string:
container_client = ContainerClient.from_connection_string(
return ContainerClient.from_connection_string(
conn_str=self.connection_string,
container_name=self.container_name,
)
return ContainerClient(
self.account_url, self.container_name, credential=self.credential
)

def _download_files_and_extract_metadata(self, temp_dir: str) -> Dict[str, Any]:
"""Download files from Azure Storage Blob and extract metadata."""
container_client = self._get_container_client()
blob_meta = {}

if self.blob:
blobs_list = [self.blob]
else:
container_client = ContainerClient(
self.account_url, self.container_name, credential=self.credential
blobs_list = container_client.list_blobs(
self.name_starts_with, self.include
)
total_download_start_time = time.time()
blob_meta = {}

with tempfile.TemporaryDirectory() as temp_dir:
if self.blob:
blob_client = container_client.get_blob_client(self.blob)
stream = blob_client.download_blob()
sanitized_file_name = stream.name.replace("/", "-")
download_file_path = os.path.join(temp_dir, sanitized_file_name)
logger.info(f"Start download of {self.blob}")
start_time = time.time()
for obj in blobs_list:
sanitized_file_name = obj.name.replace("/", "-")
download_file_path = os.path.join(temp_dir, sanitized_file_name)
logger.info(f"Start download of {obj.name}")
start_time = time.time()
blob_client = container_client.get_blob_client(obj)
stream = blob_client.download_blob()
with open(file=download_file_path, mode="wb") as download_file:
stream.readinto(download_file)
blob_meta[sanitized_file_name] = blob_client.get_blob_properties()
end_time = time.time()
logger.debug(f"{obj.name} downloaded in {end_time - start_time} seconds.")

return blob_meta

def _extract_blob_metadata(self, file_metadata: Dict[str, Any]) -> Dict[str, Any]:
meta: dict = file_metadata

creation_time = meta.get("creation_time")
creation_time = creation_time.strftime("%Y-%m-%d") if creation_time else None

last_modified = meta.get("last_modified")
last_modified = last_modified.strftime("%Y-%m-%d") if last_modified else None

last_accessed_on = meta.get("last_accessed_on")
last_accessed_on = (
last_accessed_on.strftime("%Y-%m-%d") if last_accessed_on else None
)

extracted_meta = {
"file_name": meta.get("name"),
"file_type": meta.get("content_settings", {}).get("content_type"),
"file_size": meta.get("size"),
"creation_date": creation_time,
"last_modified_date": last_modified,
"last_accessed_date": last_accessed_on,
"container": meta.get("container"),
}

extracted_meta.update(meta.get("metadata") or {})
extracted_meta.update(meta.get("tags") or {})

return extracted_meta

def _load_documents_with_metadata(
self, files_metadata: Dict[str, Any], temp_dir: str
) -> List[Document]:
"""Load documents from a directory and extract metadata."""

def get_metadata(file_name: str) -> Dict[str, Any]:
return files_metadata.get(file_name, {})

loader = SimpleDirectoryReader(
temp_dir, file_extractor=self.file_extractor, file_metadata=get_metadata
)

return loader.load_data()

def list_resources(self, *args: Any, **kwargs: Any) -> List[str]:
"""List all the blobs in the container."""
blobs_list = self._get_container_client().list_blobs(
name_starts_with=self.name_starts_with, include=self.include
)

return [blob.name for blob in blobs_list]

def get_resource_info(self, resource_id: str, **kwargs: Any) -> Dict:
"""Get metadata for a specific blob."""
container_client = self._get_container_client()
blob_client = container_client.get_blob_client(resource_id)
blob_meta = blob_client.get_blob_properties()

info_dict = {
**self._extract_blob_metadata(blob_meta),
"file_path": str(resource_id).replace(":", "/"),
}

return {
meta_key: meta_value
for meta_key, meta_value in info_dict.items()
if meta_value is not None
}

def load_resource(self, resource_id: str, **kwargs: Any) -> List[Document]:
try:
container_client = self._get_container_client()
blob_client = container_client.get_blob_client(resource_id)
stream = blob_client.download_blob()
with tempfile.TemporaryDirectory() as temp_dir:
download_file_path = os.path.join(
temp_dir, resource_id.replace("/", "-")
)
with open(file=download_file_path, mode="wb") as download_file:
stream.readinto(download_file)
blob_meta[download_file_path] = blob_client.get_blob_properties()
end_time = time.time()
logger.info(
f"{self.blob} downloaded in {end_time - start_time} seconds."
)
# TODO: Implement an "elif" for if a pickled dictionary of the Document objects are already stored, to load that in and read into the temp directory.
# Needed because the loading of a container can take some time, and if everything is already pickled into local environment, loading it from there will be much faster.
else:
logger.info("Listing blobs")
blobs_list = container_client.list_blobs(
self.name_starts_with, self.include
return self._load_documents_with_metadata(
{resource_id: blob_client.get_blob_properties()}, temp_dir
)
for obj in blobs_list:
sanitized_file_name = obj.name.replace("/", "-")
download_file_path = os.path.join(temp_dir, sanitized_file_name)
logger.info(f"Start download of {obj.name}")
start_time = time.time()
blob_client = container_client.get_blob_client(obj)
stream = blob_client.download_blob()
with open(file=download_file_path, mode="wb") as download_file:
stream.readinto(download_file)
blob_meta[download_file_path] = blob_client.get_blob_properties()
end_time = time.time()
logger.info(
f"{obj.name} downloaded in {end_time - start_time} seconds."
)
except Exception as e:
logger.error(
f"Error loading resource {resource_id} from AzStorageBlob: {e}"
)
raise

def read_file_content(self, input_file: Path, **kwargs) -> bytes:
"""Read the content of a file from Azure Storage Blob."""
container_client = self._get_container_client()
blob_client = container_client.get_blob_client(input_file)
stream = blob_client.download_blob()
return stream.readall()

def load_data(self) -> List[Document]:
"""Load file(s) from Azure Storage Blob."""
total_download_start_time = time.time()

with tempfile.TemporaryDirectory() as temp_dir:
files_metadata = self._download_files_and_extract_metadata(temp_dir)

total_download_end_time = time.time()

total_elapsed_time = math.ceil(
total_download_end_time - total_download_start_time
)

logger.info(
f"Downloading completed in approximately {total_elapsed_time // 60}min"
f" {total_elapsed_time % 60}s."
)
logger.info("Document creation starting")

def extract_blob_meta(file_path):
meta: dict = blob_meta[file_path]

creation_time = meta.get("creation_time")
creation_time = (
creation_time.strftime("%Y-%m-%d") if creation_time else None
)

last_modified = meta.get("last_modified")
last_modified = (
last_modified.strftime("%Y-%m-%d") if last_modified else None
)

last_accessed_on = meta.get("last_accessed_on")
last_accessed_on = (
last_accessed_on.strftime("%Y-%m-%d") if last_accessed_on else None
)

extracted_meta = {
"file_name": meta.get("name"),
"file_type": meta.get("content_settings", {}).get("content_type"),
"file_size": meta.get("size"),
"creation_date": creation_time,
"last_modified_date": last_modified,
"last_accessed_date": last_accessed_on,
"container": meta.get("container"),
}
extracted_meta.update(meta.get("metadata") or {})
extracted_meta.update(meta.get("tags") or {})
return extracted_meta

loader = SimpleDirectoryReader(
temp_dir,
file_extractor=self.file_extractor,
file_metadata=extract_blob_meta,
)
logger.info("Document creation starting")

return loader.load_data()
return self._load_documents_with_metadata(files_metadata, temp_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ license = "MIT"
maintainers = ["rivms"]
name = "llama-index-readers-azstorage-blob"
readme = "README.md"
version = "0.1.5"
version = "0.1.6"

[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
Expand Down

0 comments on commit a26682e

Please sign in to comment.