Skip to content

Commit

Permalink
Minor cleanup and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Oct 28, 2024
1 parent f845d59 commit 2961a2a
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 337 deletions.
37 changes: 0 additions & 37 deletions aligned/active_learning/job.py

This file was deleted.

58 changes: 0 additions & 58 deletions aligned/active_learning/selection.py

This file was deleted.

64 changes: 0 additions & 64 deletions aligned/active_learning/write_policy.py

This file was deleted.

2 changes: 1 addition & 1 deletion aligned/compiler/feature_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def as_model_version(self) -> ModelVersion:


class CouldBeEntityFeature:
def as_entity(self: T) -> T:
def as_entity(self: T) -> T: # type: ignore
return self.with_tag(StaticFeatureTags.is_entity)


Expand Down
3 changes: 2 additions & 1 deletion aligned/compiler/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ def join_asof(


def resolve_dataset_store(dataset_store: DatasetStore | StorageFileReference) -> DatasetStore:
from aligned.schemas.folder import DatasetStore, JsonDatasetStore
from aligned.schemas.folder import DatasetStore, JsonDatasetStore, StorageFileSource

if isinstance(dataset_store, DatasetStore):
return dataset_store

assert isinstance(dataset_store, StorageFileSource)
return JsonDatasetStore(dataset_store)


Expand Down
2 changes: 1 addition & 1 deletion aligned/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ async def freshness(self) -> datetime | None:

location = FeatureLocation.feature_view(view.name)

return (await self.source.freshness_for({location: view.event_timestamp}))[location]
return (await self.source.freshness_for({location: view.event_timestamp.as_feature()}))[location]


class VectorIndexStore:
Expand Down
18 changes: 11 additions & 7 deletions aligned/retrival_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,9 +946,9 @@ def add_additional_features(
elif isinstance(data, list):
df = pl.DataFrame(data, schema_overrides=schema).lazy()
elif isinstance(data, pl.DataFrame):
df = data.cast(schema).lazy()
df = data.cast(schema).lazy() # type: ignore
elif isinstance(data, pl.LazyFrame):
df = data.cast(schema)
df = data.cast(schema) # type: ignore
elif isinstance(data, pd.DataFrame):
df = pl.from_pandas(data, schema_overrides=schema).lazy()
else:
Expand Down Expand Up @@ -1062,14 +1062,14 @@ def polars_filter_expressions_from(features: list[Feature]) -> list[tuple[pl.Exp
elif isinstance(constraint, MinLength):
exprs.append(
(
pl.col(feature.name).str.lengths() > constraint.value,
pl.col(feature.name).str.len_chars() > constraint.value,
f"MinLength {feature.name} {constraint.value}",
)
)
elif isinstance(constraint, MaxLength):
exprs.append(
(
pl.col(feature.name).str.lengths() < constraint.value,
pl.col(feature.name).str.len_chars() < constraint.value,
f"MaxLength {feature.name} {constraint.value}",
)
)
Expand Down Expand Up @@ -2048,7 +2048,7 @@ async def to_lazy_polars(self) -> pl.LazyFrame:

window_data = await self.data_windows(window, data.select(required_features_name), now)

agg_data = window_data.lazy().groupby(window.group_by_names).agg(agg_expr).collect()
agg_data = window_data.lazy().group_by(window.group_by_names).agg(agg_expr).collect()
data = data.join(agg_data, on=window.group_by_names, how='left')

return data.lazy()
Expand Down Expand Up @@ -2324,10 +2324,14 @@ async def to_lazy_polars(self) -> pl.LazyFrame:

if feature.dtype == FeatureType.boolean():
df = df.with_columns(pl.col(feature.name).cast(pl.Int8).cast(pl.Boolean))
elif (feature.dtype.is_array) or (feature.dtype.is_embedding):
elif feature.dtype.is_array:
dtype = df.select(feature.name).dtypes[0]
if dtype == pl.Utf8:
df = df.with_columns(pl.col(feature.name).str.json_extract(pl.List(pl.Utf8)))
df = df.with_columns(pl.col(feature.name).str.json_decode(pl.List(pl.Utf8)))
elif feature.dtype.is_embedding:
dtype = df.select(feature.name).dtypes[0]
if dtype == pl.Utf8:
df = df.with_columns(pl.col(feature.name).str.json_decode(pl.List(pl.Float64)))
elif (feature.dtype == FeatureType.json()) or feature.dtype.is_datetime:
logger.debug(f'Converting {feature.name} to {feature.dtype.name}')
pass
Expand Down
26 changes: 13 additions & 13 deletions aligned/schemas/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,41 +1306,41 @@ async def transform_polars(
case 'day':
expr = col.day()
case 'days':
expr = col.days()
expr = col.ordinal_day()
case 'epoch':
expr = col.epoch()
case 'hour':
expr = col.hour()
case 'hours':
expr = col.hours()
expr = col.total_hours()
case 'iso_year':
expr = col.iso_year()
case 'microsecond':
expr = col.microsecond()
case 'microseconds':
expr = col.microseconds()
expr = col.total_microseconds()
case 'millisecond':
expr = col.millisecond()
case 'milliseconds':
expr = col.milliseconds()
expr = col.total_milliseconds()
case 'minute':
expr = col.minute()
case 'minutes':
expr = col.minutes()
expr = col.total_minutes()
case 'month':
expr = col.month()
case 'nanosecond':
expr = col.nanosecond()
case 'nanoseconds':
expr = col.nanoseconds()
expr = col.total_nanoseconds()
case 'ordinal_day':
expr = col.ordinal_day()
case 'quarter':
expr = col.quarter()
case 'second':
expr = col.second()
case 'seconds':
expr = col.seconds()
expr = col.total_seconds()
case 'week':
expr = col.week()
case 'weekday':
Expand Down Expand Up @@ -1540,7 +1540,7 @@ async def transform_polars(
) -> pl.LazyFrame | pl.Expr:
collected = df.collect()
pandas_column = collected.select(self.key).to_pandas()
transformed = await self.transform_pandas(pandas_column)
transformed = await self.transform_pandas(pandas_column, store)
return collected.with_columns(pl.Series(transformed).alias(alias)).lazy()

# @staticmethod
Expand Down Expand Up @@ -1980,7 +1980,7 @@ def grayscale(images) -> pl.Series:
[np.mean(image, axis=2) if len(image.shape) == 3 else image for image in images.to_list()]
)

return pl.col(self.image_key).map(grayscale).alias(alias)
return pl.col(self.image_key).map_batches(grayscale).alias(alias)


@dataclass
Expand Down Expand Up @@ -2087,7 +2087,7 @@ class ConcatStringAggregation(Transformation, PsqlTransformation, RedshiftTransf
dtype = FeatureType.string()

async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series:
pdf = await self.transform_polars(pl.from_pandas(df).lazy(), self.name)
pdf = await self.transform_polars(pl.from_pandas(df).lazy(), self.name, store)
assert isinstance(pdf, pl.LazyFrame)
return pdf.collect().to_pandas()[self.name] # type: ignore

Expand Down Expand Up @@ -2365,7 +2365,7 @@ async def transform_polars(

return df.with_columns(
pl.col(self.key)
.apply(lambda x: s3.signed_download_url(x, max_age=self.max_age_seconds))
.map_elements(lambda x: s3.signed_download_url(x, max_age=self.max_age_seconds))
.alias(alias)
)

Expand All @@ -2381,7 +2381,7 @@ class StructField(Transformation):

async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series:
data = pl.from_pandas(df).lazy()
tran = await self.transform_polars(data, 'feature')
tran = await self.transform_polars(data, 'feature', store)

if isinstance(tran, pl.LazyFrame):
return tran.collect().to_pandas()['feature'] # type: ignore
Expand All @@ -2392,7 +2392,7 @@ async def transform_polars(
self, df: pl.LazyFrame, alias: str, store: ContractStore
) -> pl.LazyFrame | pl.Expr:
if df.schema[self.key].is_(pl.Utf8):
return await JsonPath(self.key, f'$.{self.field}').transform_polars(df, alias)
return await JsonPath(self.key, f'$.{self.field}').transform_polars(df, alias, store)
else:
return pl.col(self.key).struct.field(self.field).alias(alias)

Expand Down
10 changes: 3 additions & 7 deletions aligned/sources/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING

import polars as pl
from aligned.data_source.batch_data_source import CodableBatchDataSource, ColumnFeatureMappable
Expand All @@ -31,13 +32,8 @@
from httpx import HTTPStatusError
from aligned.lazy_imports import pandas as pd

try:
from azure.storage.blob import BlobServiceClient # type: ignore
except ModuleNotFoundError:

class BlobServiceClient:
pass

if TYPE_CHECKING:
from azure.storage.blob import BlobServiceClient

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit 2961a2a

Please sign in to comment.