Skip to content

Commit

Permalink
Fixed freshness for deltafile source
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Mar 25, 2024
1 parent e82df1c commit 58cea10
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
12 changes: 12 additions & 0 deletions aligned/data_source/batch_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,29 @@ def __init__(self) -> None:
from aligned.sources.psql import PostgreSQLDataSource
from aligned.sources.redshift import RedshiftSQLDataSource
from aligned.sources.s3 import AwsS3CsvDataSource, AwsS3ParquetDataSource
from aligned.sources.azure_blob_storage import (
AzureBlobCsvDataSource,
AzureBlobDeltaDataSource,
AzureBlobParquetDataSource,
)
from aligned.schemas.feature_view import FeatureViewReferenceSource
from aligned.schemas.model import ModelSource

source_types = [
PostgreSQLDataSource,
# File Sources
ParquetFileSource,
CsvFileSource,
DeltaFileSource,
# Aws Sources
AwsS3CsvDataSource,
AwsS3ParquetDataSource,
RedshiftSQLDataSource,
# Azure Sources
AzureBlobCsvDataSource,
AzureBlobDeltaDataSource,
AzureBlobParquetDataSource,
# Aligned Related Sources
JoinDataSource,
JoinAsofDataSource,
FilteredDataSource,
Expand Down
20 changes: 17 additions & 3 deletions aligned/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
from aligned.feature_source import WritableFeatureSource
from aligned.local.job import FileDateJob, FileFactualJob, FileFullJob
from aligned.retrival_job import RetrivalJob, RetrivalRequest
from aligned.schemas.feature import FeatureType
from aligned.sources.local import CsvConfig, DataFileReference, ParquetConfig, StorageFileReference, Directory
from aligned.schemas.feature import FeatureType, EventTimestamp
from aligned.sources.local import (
CsvConfig,
DataFileReference,
ParquetConfig,
StorageFileReference,
Directory,
data_file_freshness,
)
from aligned.storage import Storage
from httpx import HTTPStatusError

Expand Down Expand Up @@ -139,7 +146,7 @@ class AzureBlobDirectory(Directory):
sub_path: Path

def json_at(self, path: str) -> StorageFileReference:
raise NotImplementedError(type(self))
return AzureBlobDataSource(self.config, (self.sub_path / path).as_posix())

def parquet_at(self, path: str) -> AzureBlobParquetDataSource:
sub_path = self.sub_path / path
Expand Down Expand Up @@ -405,6 +412,13 @@ async def to_lazy_polars(self) -> pl.LazyFrame:
except HTTPStatusError as error:
raise UnableToFindFileException() from error

async def freshness(self, event_timestamp: EventTimestamp) -> datetime | None:
try:
return await data_file_freshness(self, event_timestamp.name)
except Exception as error:
logger.info(f"Failed to get freshness for {self.path}. {error} - returning None.")
return None

def features_for(self, facts: RetrivalJob, request: RetrivalRequest) -> RetrivalJob:
return FileFactualJob(self, [request], facts)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aligned"
version = "0.0.85"
version = "0.0.86"
description = "A data managment and lineage tool for ML applications."
authors = ["Mats E. Mollestad <mats@mollestad.no>"]
license = "Apache-2.0"
Expand Down

0 comments on commit 58cea10

Please sign in to comment.