diff --git a/aligned/__init__.py b/aligned/__init__.py index ea75e1d4..1158651c 100644 --- a/aligned/__init__.py +++ b/aligned/__init__.py @@ -19,6 +19,9 @@ CustomAggregation, List, Embedding, + transform_polars, + transform_row, + transform_pandas, ) from aligned.compiler.model import model_contract, FeatureInputVersions from aligned.data_source.stream_data_source import HttpStreamSource @@ -78,6 +81,10 @@ 'EmbeddingModel', 'feature_view', 'model_contract', + # Transformations + 'transform_polars', + 'transform_row', + 'transform_pandas', # Aggregation 'CustomAggregation', # Schemas diff --git a/aligned/compiler/aggregation_factory.py b/aligned/compiler/aggregation_factory.py index 4ea9bc04..dad542ae 100644 --- a/aligned/compiler/aggregation_factory.py +++ b/aligned/compiler/aggregation_factory.py @@ -363,7 +363,7 @@ def compile(self) -> Transformation: from aligned.schemas.transformation import PolarsFunctionTransformation, PolarsLambdaTransformation if isinstance(self.method, pl.Expr): - method = lambda df, alias: self.method # noqa: E731 + method = lambda df, alias, store: self.method # noqa: E731 code = '' return PolarsLambdaTransformation(method=dill.dumps(method), code=code, dtype=self.dtype.dtype) else: diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index 43658ce1..c71551c0 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -36,6 +36,7 @@ if TYPE_CHECKING: from aligned.sources.s3 import AwsS3Config + from aligned.feature_store import ContractStore class TransformationFactory: @@ -527,7 +528,9 @@ def fill_na(self: T, value: FeatureFactory | Any) -> T: return instance # type: ignore [return-value] def transformed_using_features_pandas( - self: T, using_features: list[FeatureFactory], transformation: Callable[[pd.DataFrame], pd.Series] + self: T, + using_features: list[FeatureFactory], + transformation: Callable[[pd.DataFrame, ContractStore], pd.Series], ) -> T: from aligned.compiler.transformation_factory import PandasTransformationFactory @@ -536,7 +539,9 @@ def transformed_using_features_pandas( dtype.transformation = PandasTransformationFactory(dtype, transformation, using_features or [self]) return dtype # type: ignore [return-value] - def transform_pandas(self, transformation: Callable[[pd.DataFrame], pd.Series], as_dtype: T) -> T: + def transform_pandas( + self, transformation: Callable[[pd.DataFrame, ContractStore], pd.Series], as_dtype: T + ) -> T: from aligned.compiler.transformation_factory import PandasTransformationFactory dtype: FeatureFactory = as_dtype # type: ignore [assignment] @@ -547,7 +552,7 @@ def transform_pandas(self, transformation: Callable[[pd.DataFrame], pd.Series], def transformed_using_features_polars( self: T, using_features: list[FeatureFactory], - transformation: Callable[[pl.LazyFrame, str], pl.LazyFrame] | pl.Expr, + transformation: Callable[[pl.LazyFrame, str, ContractStore], pl.LazyFrame] | pl.Expr, ) -> T: from aligned.compiler.transformation_factory import PolarsTransformationFactory @@ -1845,8 +1850,8 @@ def percentile(self, percentile: float) -> Float: def transform_polars( using_features: list[FeatureFactory], return_type: T -) -> Callable[[Callable[[Any, pl.LazyFrame, str], pl.LazyFrame]], T]: - def wrapper(method: Callable[[Any, pl.LazyFrame, str], pl.LazyFrame]) -> T: +) -> Callable[[Callable[[Any, pl.LazyFrame, str, ContractStore], pl.LazyFrame]], T]: + def wrapper(method: Callable[[Any, pl.LazyFrame, str, ContractStore], pl.LazyFrame]) -> T: return return_type.transformed_using_features_polars( using_features=using_features, transformation=method # type: ignore ) @@ -1856,8 +1861,8 @@ def wrapper(method: Callable[[Any, pl.LazyFrame, str], pl.LazyFrame]) -> T: def transform_pandas( using_features: list[FeatureFactory], return_type: T -) -> Callable[[Callable[[Any, pd.DataFrame], pd.Series]], T]: - def wrapper(method: Callable[[Any, pd.DataFrame], pd.Series]) -> T: +) -> Callable[[Callable[[Any, pd.DataFrame, ContractStore], pd.Series]], T]: + def wrapper(method: Callable[[Any, pd.DataFrame, ContractStore], pd.Series]) -> T: return return_type.transformed_using_features_pandas( using_features=using_features, transformation=method # type: ignore ) @@ -1867,8 +1872,8 @@ def wrapper(method: Callable[[Any, pd.DataFrame], pd.Series]) -> T: def transform_row( using_features: list[FeatureFactory], return_type: T -) -> Callable[[Callable[[Any, dict[str, Any]], Any]], T]: - def wrapper(method: Callable[[Any, dict[str, Any]], Any]) -> T: +) -> Callable[[Callable[[Any, dict[str, Any], ContractStore], Any]], T]: + def wrapper(method: Callable[[Any, dict[str, Any], ContractStore], Any]) -> T: from aligned.compiler.transformation_factory import MapRowTransformation new_value = return_type.copy_type() diff --git a/aligned/compiler/transformation_factory.py b/aligned/compiler/transformation_factory.py index 2d8b890c..0e70c8c3 100644 --- a/aligned/compiler/transformation_factory.py +++ b/aligned/compiler/transformation_factory.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass, field from datetime import timedelta # noqa: TC003 -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable import polars as pl @@ -12,6 +12,9 @@ from aligned.compiler.feature_factory import FeatureFactory, Transformation, TransformationFactory from aligned.schemas.transformation import FillNaValuesColumns, LiteralValue, EmbeddingModel +if TYPE_CHECKING: + from aligned.feature_store import ContractStore + logger = logging.getLogger(__name__) @@ -683,7 +686,7 @@ def compile(self) -> Transformation: class PandasTransformationFactory(TransformationFactory): dtype: FeatureFactory - method: Callable[[pd.DataFrame], pd.Series] + method: Callable[[pd.DataFrame, ContractStore], pd.Series] _using_features: list[FeatureFactory] @property @@ -739,7 +742,7 @@ def compile(self) -> Transformation: class PolarsTransformationFactory(TransformationFactory): dtype: FeatureFactory - method: pl.Expr | Callable[[pl.LazyFrame, pl.Expr], pl.LazyFrame] + method: pl.Expr | Callable[[pl.LazyFrame, pl.Expr, ContractStore], pl.LazyFrame] _using_features: list[FeatureFactory] @property diff --git a/aligned/feature_store.py b/aligned/feature_store.py index 78b56c45..51fba80f 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -464,7 +464,7 @@ def features_for( return entities new_request = FeatureRequest(requests.location, requests.features_to_include, loaded_requests) - return self.features_for_request(new_request, entities, feature_names) + return self.features_for_request(new_request, entities, feature_names).inject_store(self) def model(self, model: str | ModelContractWrapper) -> ModelFeatureStore: """ @@ -485,7 +485,10 @@ def model(self, model: str | ModelContractWrapper) -> ModelFeatureStore: return ModelFeatureStore(self.models[name], self) - def vector_index(self, name: str) -> VectorIndexStore: + def vector_index(self, name: str | ModelContractWrapper) -> VectorIndexStore: + if isinstance(name, ModelContractWrapper): + name = name.location.name + return VectorIndexStore(self, self.vector_indexes[name], index_name=name) def event_triggers_for(self, feature_view: str) -> set[EventTrigger]: @@ -659,6 +662,10 @@ class MyFeatureView: if view.name in self.feature_views: raise ValueError(f'Feature view with name "{view.name}" already exists') + if isinstance(view.source, VectorIndex): + index_name = view.source.vector_index_name() or view.name + self.vector_indexes[index_name] = view + self.feature_views[view.name] = view if isinstance(self.feature_source, BatchFeatureSource): assert isinstance(self.feature_source.sources, dict) @@ -1295,7 +1302,11 @@ def all_predictions(self, limit: int | None = None) -> RetrivalJob: source = selected_source.sources[location.identifier] request = self.model.predictions_view.request(self.model.name) - return source.all_data(request, limit=limit).select_columns(set(request.all_returned_columns)) + return ( + source.all_data(request, limit=limit) + .inject_store(self.store) + .select_columns(set(request.all_returned_columns)) + ) def using_source(self, source: FeatureSourceable | BatchDataSource) -> ModelFeatureStore: @@ -1650,6 +1661,7 @@ def all_columns(self, limit: int | None = None) -> RetrivalJob: self.source.all_for(request, limit) .ensure_types(request.needed_requests) .derive_features(request.needed_requests) + .inject_store(self.store) ) if self.feature_filter: selected_columns = self.feature_filter @@ -1671,7 +1683,7 @@ def between_dates(self, start_date: datetime, end_date: datetime) -> RetrivalJob ) request = self.view.request_all - return self.source.all_between(start_date, end_date, request) + return self.source.all_between(start_date, end_date, request).inject_store(self.store) def previous(self, days: int = 0, minutes: int = 0, seconds: int = 0) -> RetrivalJob: end_date = datetime.utcnow() diff --git a/aligned/jobs/tests/test_combined_job.py b/aligned/jobs/tests/test_combined_job.py index ba194223..d7b63a60 100644 --- a/aligned/jobs/tests/test_combined_job.py +++ b/aligned/jobs/tests/test_combined_job.py @@ -1,39 +1,81 @@ import pytest from aligned import feature_view, String, Bool +from aligned.compiler.model import model_contract +from aligned.feature_store import ContractStore from aligned.sources.in_mem_source import InMemorySource from aligned.retrival_job import CombineFactualJob, RetrivalJob, RetrivalRequest -from aligned.compiler.feature_factory import transform_polars, transform_pandas, transform_row +from aligned.compiler.feature_factory import ( + Embedding, + List, + transform_polars, + transform_pandas, + transform_row, +) import polars as pl from aligned.lazy_imports import pandas as pd +@model_contract( + name='test_embedding', + input_features=[], + output_source=InMemorySource.from_values( + { + 'vec_id': ['a', 'b', 'c'], + 'value': ['hello there', 'no', 'something else'], + 'embedding': [[1, 2], [1, 0], [0, 9]], + } + ), +) +class TestEmbedding: + vec_id = String().as_entity() + value = String() + embedding = Embedding(embedding_size=2) + + @feature_view(source=InMemorySource.empty()) class CombinedData: query = String() contains_mr = query.contains('mr') + embedding = Embedding(embedding_size=2) @transform_polars(using_features=[query], return_type=Bool()) - def contains_something(self, df: pl.LazyFrame, return_value: str) -> pl.LazyFrame: + def contains_something(self, df: pl.LazyFrame, return_value: str, store: ContractStore) -> pl.LazyFrame: return df.with_columns((pl.col('query').str.len_chars() > 5).alias(return_value)) @transform_pandas(using_features=[query], return_type=String()) - def append_someting(self, df: pd.DataFrame) -> pd.Series: + def append_someting(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df['query'] + ' something' @transform_row(using_features=[query], return_type=String()) - def using_row(self, row: dict) -> str: + def using_row(self, row: dict, store: ContractStore) -> str: return row['query'] + ' something' + @transform_row(using_features=[embedding], return_type=List(String())) + async def related_entities(self, row: dict, store: ContractStore) -> list[str]: + df = ( + await store.vector_index('test_embedding') + .nearest_n_to(entities=[row], number_of_records=2) + .to_polars() + ) + print(df) + return df['vec_id'].to_list() + not_contains = contains_something.not_equals(True) @pytest.mark.asyncio async def test_feature_view_without_entity(): + store = ContractStore.empty() + store.add_model(TestEmbedding) + store.add_feature_view(CombinedData) - job = CombinedData.query().features_for({'query': ['Hello', 'Hello mr']}) + job = store.feature_view(CombinedData).features_for( + {'query': ['Hello', 'Hello mr'], 'embedding': [[1, 3], [0, 10]]} + ) df = await job.to_polars() + print(df) assert df['contains_mr'].sum() == 1 diff --git a/aligned/local/job.py b/aligned/local/job.py index 4e127c50..176dea10 100644 --- a/aligned/local/job.py +++ b/aligned/local/job.py @@ -57,6 +57,7 @@ async def to_lazy_polars(self) -> pl.LazyFrame: async def aggregate(request: RetrivalRequest, core_data: pl.LazyFrame) -> pl.LazyFrame: + from aligned import ContractStore aggregate_over = request.aggregate_over() @@ -65,7 +66,9 @@ async def aggregate(request: RetrivalRequest, core_data: pl.LazyFrame) -> pl.Laz exprs = [] for feat in aggregate_over[first_over]: - tran = await feat.derived_feature.transformation.transform_polars(core_data, feat.name) + tran = await feat.derived_feature.transformation.transform_polars( + core_data, feat.name, ContractStore.empty() + ) if not isinstance(tran, pl.Expr): raise ValueError(f'Aggregation needs to be an expression, got {tran}') @@ -86,7 +89,9 @@ async def aggregate(request: RetrivalRequest, core_data: pl.LazyFrame) -> pl.Laz for over, features in aggregate_over.items(): exprs = [] for feat in features: - tran = await feat.derived_feature.transformation.transform_polars(core_data, feat.name) + tran = await feat.derived_feature.transformation.transform_polars( + core_data, feat.name, ContractStore.empty() + ) if not isinstance(tran, pl.Expr): raise ValueError(f'Aggregation needs to be an expression, got {tran}') @@ -321,6 +326,7 @@ async def aggregate_over( event_timestamp_col: str, group_by: list[str] | None = None, ) -> pl.LazyFrame: + from aligned import ContractStore if not group_by: group_by = ['row_id'] @@ -338,7 +344,7 @@ async def aggregate_over( transformations = [] for feature in features: expr = await feature.derived_feature.transformation.transform_polars( - subset, feature.derived_feature.name + subset, feature.derived_feature.name, ContractStore.empty() ) if isinstance(expr, pl.Expr): transformations.append(expr.alias(feature.name)) diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index 86b2794d..cc68fa7b 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -538,7 +538,9 @@ def describe(self) -> str: return f'{self.job.describe()} with target columns {self.target_columns}' -ConvertableToRetrivalJob = Union[dict[str, list], 'pd.DataFrame', pl.DataFrame, pl.LazyFrame] +ConvertableToRetrivalJob = Union[ + list[dict[str, Any]], dict[str, list], 'pd.DataFrame', pl.DataFrame, pl.LazyFrame +] class RetrivalJob(ABC): @@ -571,6 +573,11 @@ async def to_lazy_polars(self) -> pl.LazyFrame: async def to_polars(self) -> pl.DataFrame: return await (await self.to_lazy_polars()).collect_async() + def inject_store(self, store: ContractStore) -> RetrivalJob: + if isinstance(self, ModificationJob): + return self.copy_with(self.job.inject_store(store)) + return self + def describe(self) -> str: if isinstance(self, ModificationJob): return f'{self.job.describe()} -> {self.__class__.__name__}' @@ -860,6 +867,7 @@ def from_lazy_function( def from_convertable( data: ConvertableToRetrivalJob, request: list[RetrivalRequest] | RetrivalRequest | FeatureRequest ) -> RetrivalJob: + import polars as pl from aligned.local.job import LiteralRetrivalJob if isinstance(request, RetrivalRequest): @@ -868,9 +876,10 @@ def from_convertable( request = request.needed_requests if isinstance(data, dict): - return LiteralDictJob(data, request) - - if isinstance(data, pl.DataFrame): + return LiteralRetrivalJob(pl.DataFrame(data).lazy(), request) + elif isinstance(data, list): + return LiteralRetrivalJob(pl.DataFrame(data).lazy(), request) + elif isinstance(data, pl.DataFrame): return LiteralRetrivalJob(data.lazy(), request) elif isinstance(data, pl.LazyFrame): return LiteralRetrivalJob(data, request) @@ -1440,7 +1449,10 @@ async def to_lazy_polars(self) -> pl.LazyFrame: elif isinstance(self.condition, pl.Expr): col = self.condition elif isinstance(self.condition, DerivedFeature): - expr = await self.condition.transformation.transform_polars(df, self.condition.name) + from aligned.feature_store import ContractStore + + store = ContractStore.empty() + expr = await self.condition.transformation.transform_polars(df, self.condition.name, store) if isinstance(expr, pl.Expr): col = expr else: @@ -1460,7 +1472,10 @@ async def to_pandas(self) -> pd.DataFrame: if isinstance(self.condition, str): mask = df[self.condition] elif isinstance(self.condition, DerivedFeature): - mask = await self.condition.transformation.transform_pandas(df) + from aligned.feature_store import ContractStore + + store = ContractStore.empty() + mask = await self.condition.transformation.transform_pandas(df, store) elif isinstance(self.condition, Feature): mask = df[self.condition.name] else: @@ -1673,6 +1688,7 @@ class DerivedFeatureJob(RetrivalJob, ModificationJob): job: RetrivalJob requests: list[RetrivalRequest] + store: ContractStore | None = field(default=None) @property def request_result(self) -> RequestResult: @@ -1682,6 +1698,11 @@ def request_result(self) -> RequestResult: def retrival_requests(self) -> list[RetrivalRequest]: return self.job.retrival_requests + def inject_store(self, store: ContractStore) -> RetrivalJob: + job = self.copy_with(self.job.inject_store(store)) + job.store = store + return job + def filter(self, condition: str | Feature | DerivedFeature | pl.Expr) -> RetrivalJob: if isinstance(condition, str): @@ -1697,6 +1718,7 @@ def filter(self, condition: str | Feature | DerivedFeature | pl.Expr) -> Retriva return self.copy_with(self.job.filter(condition)) async def compute_derived_features_polars(self, df: pl.LazyFrame) -> pl.LazyFrame: + from aligned.feature_store import ContractStore for request in self.requests: missing_features = request.features_to_include - set(df.columns) @@ -1716,7 +1738,9 @@ async def compute_derived_features_polars(self, df: pl.LazyFrame) -> pl.LazyFram logger.debug(f'Adding feature to computation plan in polars: {feature.name}') - method = await feature.transformation.transform_polars(df, feature.name) + method = await feature.transformation.transform_polars( + df, feature.name, self.store or ContractStore.empty() + ) if isinstance(method, pl.LazyFrame): df = method elif isinstance(method, pl.Expr): @@ -1730,6 +1754,8 @@ async def compute_derived_features_polars(self, df: pl.LazyFrame) -> pl.LazyFram return df async def compute_derived_features_pandas(self, df: pd.DataFrame) -> pd.DataFrame: + from aligned.feature_store import ContractStore + for request in self.requests: for feature_round in request.derived_features_order(): for feature in feature_round: @@ -1739,7 +1765,7 @@ async def compute_derived_features_pandas(self, df: pd.DataFrame) -> pd.DataFram logger.debug(f'Computing feature with pandas: {feature.name}') df[feature.name] = await feature.transformation.transform_pandas( - df[feature.depending_on_names] # type: ignore + df[feature.depending_on_names], self.store or ContractStore.empty() # type: ignore ) return df @@ -2280,6 +2306,7 @@ class Combined(CombinedFeatureView): jobs: list[RetrivalJob] combined_requests: list[RetrivalRequest] + store: ContractStore | None = field(default=None) @property def request_result(self) -> RequestResult: @@ -2297,7 +2324,14 @@ def retrival_requests(self) -> list[RetrivalRequest]: def ignore_event_timestamp(self) -> RetrivalJob: return CombineFactualJob([job.ignore_event_timestamp() for job in self.jobs], self.combined_requests) + def inject_store(self, store: ContractStore) -> RetrivalJob: + return CombineFactualJob( + [job.inject_store(store) for job in self.jobs], self.combined_requests, store=store + ) + async def combine_data(self, df: pd.DataFrame) -> pd.DataFrame: + from aligned import ContractStore + for request in self.combined_requests: for feature in request.derived_features: if feature.name in df.columns: @@ -2305,11 +2339,13 @@ async def combine_data(self, df: pd.DataFrame) -> pd.DataFrame: continue logger.debug(f'Computing feature: {feature.name}') df[feature.name] = await feature.transformation.transform_pandas( - df[feature.depending_on_names] # type: ignore + df[feature.depending_on_names], self.store or ContractStore.empty() # type: ignore ) return df async def combine_polars_data(self, df: pl.LazyFrame) -> pl.LazyFrame: + from aligned import ContractStore + for request in self.combined_requests: logger.debug(f'{request.name}, {len(request.derived_features)}') for feature in request.derived_features: @@ -2317,7 +2353,9 @@ async def combine_polars_data(self, df: pl.LazyFrame) -> pl.LazyFrame: logger.debug(f'Skipping feature {feature.name}, already computed') continue logger.debug(f'Computing feature: {feature.name}') - result = await feature.transformation.transform_polars(df, feature.name) + result = await feature.transformation.transform_polars( + df, feature.name, self.store or ContractStore.empty() + ) if isinstance(result, pl.Expr): df = df.with_columns([result.alias(feature.name)]) elif isinstance(result, pl.LazyFrame): diff --git a/aligned/schemas/transformation.py b/aligned/schemas/transformation.py index afdac191..f308f0a7 100644 --- a/aligned/schemas/transformation.py +++ b/aligned/schemas/transformation.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from aligned.sources.s3 import AwsS3Config + from aligned.feature_store import ContractStore @dataclass @@ -75,10 +76,12 @@ class Transformation(Codable, SerializableType): name: str dtype: FeatureType - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError(type(self)) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr | pl.Expr: raise NotImplementedError(type(self)) def _serialize(self) -> dict: @@ -105,11 +108,14 @@ def test_definition() -> TransformationTestDefinition: @classmethod async def run_transformation_test_polars(cls) -> None: from polars.testing import assert_series_equal + from aligned import ContractStore try: test = cls.test_definition() alias = 'something' - output_df = await test.transformation.transform_polars(test.input_polars.lazy(), alias=alias) + output_df = await test.transformation.transform_polars( + test.input_polars.lazy(), alias=alias, store=ContractStore.empty() + ) if isinstance(output_df, pl.Expr): output_df = test.input_polars.lazy().with_columns([output_df.alias(alias)]) output = output_df.select(pl.col(alias)).collect().to_series() @@ -134,11 +140,12 @@ async def run_transformation_test_polars(cls) -> None: @classmethod async def run_transformation_test_pandas(cls) -> None: import numpy as np + from aligned import ContractStore from numpy.testing import assert_almost_equal with suppress(NotImplementedError): test = cls.test_definition() - output = await test.transformation.transform_pandas(test.input_pandas) + output = await test.transformation.transform_pandas(test.input_pandas, ContractStore.empty()) if test.transformation.dtype == FeatureType.boolean(): is_correct = np.all(output == test.output_pandas) | output.equals(test.output_pandas) assert is_correct, ( @@ -264,27 +271,30 @@ class PolarsMapRowTransformation(Transformation): dtype: FeatureType name: str = 'pol_map_row' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: - return (await self.transform_polars(pl.from_pandas(df).lazy(), 'value')).collect()[ + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: + return (await self.transform_polars(pl.from_pandas(df).lazy(), 'value', store)).collect()[ 'value' ] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: if self.function_name not in locals(): exec(self.code) loaded = locals()[self.function_name] polars_df = df.collect() - columns = polars_df.columns - new_cols = polars_df.columns - new_cols.append(alias) + new_rows = [] - return ( - polars_df.map_rows(lambda values: (*values, loaded(dict(zip(columns, values))))) - .rename(lambda col: new_cols[int(col.split('_')[1])]) - .lazy() - ) + for row in polars_df.to_dicts(): + if asyncio.iscoroutinefunction(loaded): + row[alias] = await loaded(row, store) + else: + row[alias] = loaded(row, store) + new_rows.append(row) + + return pl.DataFrame(new_rows).lazy() @dataclass @@ -301,26 +311,28 @@ class PandasFunctionTransformation(Transformation): dtype: FeatureType name: str = 'pandas_code_tran' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: if self.function_name not in locals(): exec(self.code) loaded = locals()[self.function_name] if asyncio.iscoroutinefunction(loaded): - return await loaded(df) + return await loaded(df, store) else: - return loaded(df) + return loaded(df, store) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: pandas_df = df.collect().to_pandas() if self.function_name not in locals(): exec(self.code) loaded = locals()[self.function_name] if asyncio.iscoroutinefunction(loaded): - pandas_df[alias] = await loaded(pandas_df) + pandas_df[alias] = await loaded(pandas_df, store) else: - pandas_df[alias] = loaded(pandas_df) + pandas_df[alias] = loaded(pandas_df, store) return pl.from_pandas(pandas_df).lazy() @@ -328,7 +340,7 @@ async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | def test_definition() -> TransformationTestDefinition: return TransformationTestDefinition( transformation=PandasFunctionTransformation( - code='async def test(df):\n return df["a"] + df["b"]', + code='async def test(df, store):\n return df["a"] + df["b"]', function_name='test', dtype=FeatureType.int32(), ), @@ -348,27 +360,29 @@ class PandasLambdaTransformation(Transformation): dtype: FeatureType name: str = 'pandas_lambda_tran' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: import asyncio import dill loaded = dill.loads(self.method) if asyncio.iscoroutinefunction(loaded): - return await loaded(df) + return await loaded(df, store) else: - return loaded(df) + return loaded(df, store) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: import dill pandas_df = df.collect().to_pandas() loaded = dill.loads(self.method) if asyncio.iscoroutinefunction(loaded): - pandas_df[alias] = await loaded(pandas_df) + pandas_df[alias] = await loaded(pandas_df, store) else: - pandas_df[alias] = loaded(pandas_df) + pandas_df[alias] = loaded(pandas_df, store) return pl.from_pandas(pandas_df).lazy() @@ -387,20 +401,22 @@ class PolarsFunctionTransformation(Transformation): dtype: FeatureType name: str = 'pandas_code_tran' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: - polars_df = await self.transform_polars(pl.from_pandas(df).lazy(), self.function_name) + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: + polars_df = await self.transform_polars(pl.from_pandas(df).lazy(), self.function_name, store) assert isinstance(polars_df, pl.LazyFrame) return polars_df.collect().to_pandas()[self.function_name] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: if self.function_name not in locals(): exec(self.code) loaded = locals()[self.function_name] if asyncio.iscoroutinefunction(loaded): - return await loaded(df, alias) + return await loaded(df, alias, store) else: - return loaded(df, alias) + return loaded(df, alias, store) @dataclass @@ -411,7 +427,7 @@ class PolarsLambdaTransformation(Transformation): dtype: FeatureType name: str = 'polars_lambda_tran' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: import dill loaded: pl.Expr = dill.loads(self.method) @@ -419,14 +435,16 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: pl_df = pl_df.with_columns((loaded).alias('polars_tran_column')) return pl_df['polars_tran_column'].to_pandas() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: import dill - tran: Callable[[pl.LazyFrame, str], pl.LazyFrame] = dill.loads(self.method) + tran = dill.loads(self.method) if isinstance(tran, pl.Expr): return tran else: - return tran(df, alias) + return tran(df, alias, store) @dataclass @@ -437,10 +455,12 @@ class NotNull(Transformation): name: str = 'not_null' dtype: FeatureType = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].notnull() # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns(pl.col(self.key).is_not_null().alias(alias)) @staticmethod @@ -461,10 +481,12 @@ class Equals(Transformation): name: str = 'equals_feature' dtype: FeatureType = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] == df[self.other_key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) == pl.col(self.other_key) @staticmethod @@ -492,10 +514,12 @@ def __init__(self, key: str, value: LiteralValue) -> None: self.key = key self.value = value - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] == self.value.python_value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) == self.value.python_value @staticmethod @@ -520,14 +544,16 @@ def __init__(self, first_key: str, second_key: str) -> None: self.first_key = first_key self.second_key = second_key - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.first_key].isnull() | df[self.second_key].isnull()), transformation=lambda dfv: dfv[self.first_key] & dfv[self.second_key], ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns( ( pl.when(pl.col(self.first_key).is_not_null() & pl.col(self.second_key).is_not_null()) @@ -558,10 +584,12 @@ def __init__(self, first_key: str, second_key: str) -> None: self.first_key = first_key self.second_key = second_key - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((pl.col(self.first_key) | pl.col(self.second_key)).alias(alias)) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: df[self.first_key].__invert__ return gracefull_transformation( df, @@ -589,14 +617,16 @@ class Inverse(Transformation): def __init__(self, key: str) -> None: self.key = key - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isnull()), # type: ignore transformation=lambda dfv: ~dfv[self.key].astype('bool'), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((~pl.col(self.key)).alias(alias)) @staticmethod @@ -617,10 +647,12 @@ class NotEquals(Transformation): name: str = 'not-equals-feature' dtype: FeatureType = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] != df[self.other_key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) != pl.col(self.other_key) @staticmethod @@ -651,10 +683,12 @@ def __init__(self, key: str, value: Any) -> None: else: self.value = LiteralValue.from_value(value) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] != self.value.python_value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) != self.value.python_value @staticmethod @@ -675,10 +709,12 @@ class GreaterThenValue(Transformation): name: str = 'gt' dtype: FeatureType = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] > self.value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) > self.value @staticmethod @@ -699,10 +735,12 @@ class GreaterThen(Transformation): name: str = field(default='gtf') dtype: FeatureType = field(default=FeatureType.boolean()) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.left_key] > df[self.right_key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.left_key) > pl.col(self.right_key) @staticmethod @@ -729,14 +767,16 @@ def __init__(self, key: str, value: float) -> None: self.key = key self.value = value - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | df[self.key].isnull()), transformation=lambda dfv: dfv[self.key] >= self.value, ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((pl.col(self.key) >= self.value).alias(alias)) @staticmethod @@ -763,14 +803,16 @@ def __init__(self, key: str, right_col: str) -> None: self.key = key self.right_col = right_col - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | df[self.key].isnull()), transformation=lambda dfv: dfv[self.key] < dfv[self.right_col], ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((pl.col(self.key) < pl.col(self.right_col)).alias(alias)) @staticmethod @@ -795,10 +837,12 @@ def __init__(self, key: str, right_col: str) -> None: self.key = key self.right_col = right_col - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) <= pl.col(self.right_col) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | df[self.key].isnull()), @@ -827,14 +871,16 @@ def __init__(self, key: str, value: float) -> None: self.key = key self.value = value - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | df[self.key].isnull()), transformation=lambda dfv: dfv[self.key] < self.value, ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((pl.col(self.key) < self.value).alias(alias)) @staticmethod @@ -859,10 +905,12 @@ def __init__(self, key: str, value: float) -> None: self.key = key self.value = value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) <= self.value - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | df[self.key].isnull()), @@ -891,10 +939,12 @@ def __init__(self, front: str, behind: LiteralValue) -> None: self.front = front self.behind = behind - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.front) - pl.lit(self.behind.python_value) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.front].isna()), # type: ignore @@ -928,10 +978,12 @@ def __init__(self, front: str, behind: str) -> None: self.front = front self.behind = behind - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.front) - pl.col(self.behind) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.front].isna() | df[self.behind].isna()), @@ -961,10 +1013,12 @@ class AdditionValue(Transformation): name: str = 'add_value' dtype: FeatureType = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.feature] + self.value.python_value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.feature) + pl.lit(self.value.python_value) @staticmethod @@ -991,10 +1045,12 @@ def __init__(self, front: str, behind: str) -> None: self.front = front self.behind = behind - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.front] * df[self.behind] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.front) * pl.col(self.behind) def as_psql(self) -> str: @@ -1014,10 +1070,12 @@ def __init__(self, key: str, value: LiteralValue) -> None: self.key = key self.value = value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key) * pl.lit(self.value.python_value) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] * self.value.python_value def as_psql(self) -> str: @@ -1037,14 +1095,16 @@ def __init__(self, front: str, behind: str) -> None: self.front = front self.behind = behind - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.front].isna() | df[self.behind].isna()), transformation=lambda dfv: dfv[self.front] + dfv[self.behind], ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.front) + pl.col(self.behind) def as_psql(self) -> str: @@ -1076,14 +1136,16 @@ def __init__(self, front: str, behind: str, unit: str = 's') -> None: self.behind = behind self.unit = unit - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.front].isna() | df[self.behind].isna()), transformation=lambda dfv: (dfv[self.front] - dfv[self.behind]) / np.timedelta64(1, self.unit), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns((pl.col(self.front) - pl.col(self.behind)).dt.seconds().alias(alias)) @staticmethod @@ -1126,14 +1188,16 @@ class Logarithm(Transformation): def __init__(self, key: str) -> None: self.key = key - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | (df[self.key] <= 0)), transformation=lambda dfv: np.log(dfv[self.key]), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns( (pl.when(pl.col(self.key) > 0).then(pl.col(self.key).log()).otherwise(pl.lit(None))).alias(alias) ) @@ -1158,14 +1222,16 @@ class LogarithmOnePluss(Transformation): def __init__(self, key: str) -> None: self.key = key - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna() | (df[self.key] <= -1)), transformation=lambda dfv: np.log1p(dfv[self.key]), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns( (pl.when(pl.col(self.key) > -1).then((pl.col(self.key) + 1).log()).otherwise(pl.lit(None))).alias( alias @@ -1194,12 +1260,14 @@ class ToNumerical(Transformation): def __init__(self, key: str) -> None: self.key = key - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from pandas import to_numeric return to_numeric(df[self.key], errors='coerce') # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).cast(pl.Float64) @staticmethod @@ -1226,7 +1294,7 @@ def __init__(self, key: str, component: str) -> None: self.key = key self.component = component - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, @@ -1234,7 +1302,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: transformation=lambda dfv: getattr(dfv[self.key].dt, self.component), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: col = pl.col(self.key).cast(pl.Datetime).dt match self.component: case 'day': @@ -1319,10 +1389,12 @@ class ArrayAtIndex(Transformation): name: str = 'array_at_index' dtype: FeatureType = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return pl.Series(df[self.key]).list.get(self.index).to_pandas() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).list.get(self.index).alias(alias) @staticmethod @@ -1355,10 +1427,12 @@ def __init__(self, key: str, value: Any | LiteralValue) -> None: else: self.value = LiteralValue.from_value(value) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return pl.Series(df[self.key]).list.contains(self.value.python_value).to_pandas() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).list.contains(self.value.python_value) @staticmethod @@ -1388,14 +1462,16 @@ def __init__(self, key: str, value: str) -> None: self.key = key self.value = value - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.key].isna()), # type: ignore transformation=lambda dfv: dfv[self.key].astype('str').str.contains(self.value), # type: ignore ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).str.contains(self.value) @staticmethod @@ -1424,10 +1500,12 @@ def __init__(self, key: str, orders: list[str]) -> None: self.key = key self.orders = orders - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].map(self.orders_dict) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: mapper = pl.DataFrame({self.key: list(self.orders), alias: list(range(0, len(self.orders)))}) return df.join(mapper.lazy(), on=self.key, how='left') @@ -1455,7 +1533,7 @@ def __init__(self, key: str, values: list[tuple[str, str]]) -> None: self.key = key self.values = values - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: temp_df = df[self.key].copy() mask = ~(df[self.key].isna() | df[self.key].isnull()) temp_df.loc[~mask] = np.nan @@ -1464,7 +1542,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return temp_df # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: collected = df.collect() pandas_column = collected.select(self.key).to_pandas() transformed = await self.transform_pandas(pandas_column) @@ -1500,7 +1580,7 @@ def __init__(self, numerator: str, denumerator: str) -> None: self.numerator = numerator self.denumerator = denumerator - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~( @@ -1510,7 +1590,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: / dfv[self.denumerator].astype(float), ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return ( pl.when(pl.col(self.denumerator) != 0) .then(pl.col(self.numerator) / pl.col(self.denumerator)) @@ -1542,14 +1624,16 @@ def __init__(self, numerator: str, denumerator: LiteralValue) -> None: self.denumerator = denumerator assert denumerator.python_value != 0 - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return gracefull_transformation( df, is_valid_mask=~(df[self.numerator].isna()), # type: ignore transformation=lambda dfv: dfv[self.numerator].astype(float) / self.denumerator.python_value, ) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.numerator) / pl.lit(self.denumerator.python_value) @staticmethod @@ -1572,10 +1656,12 @@ class IsIn(Transformation): name = 'isin' dtype = FeatureType.boolean() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].isin(self.values) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).is_in(self.values) @staticmethod @@ -1596,10 +1682,12 @@ class FillNaValuesColumns(Transformation): name: str = 'fill_missing_key' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].fillna(df[self.fill_key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: if self.dtype == FeatureType.floating_point(): return pl.col(self.key).fill_nan(pl.col(self.fill_key)).fill_null(pl.col(self.fill_key)) @@ -1627,10 +1715,12 @@ class FillNaValues(Transformation): name: str = 'fill_missing' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].fillna(self.value.python_value) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: if self.dtype == FeatureType.floating_point(): return pl.col(self.key).fill_nan(self.value.python_value).fill_null(self.value.python_value) @@ -1656,10 +1746,12 @@ class CopyTransformation(Transformation): name: str = 'nothing' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).alias(alias) @@ -1671,12 +1763,14 @@ class Floor(Transformation): name: str = 'floor' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from numpy import floor return floor(df[self.key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).floor().alias(alias) @staticmethod @@ -1696,12 +1790,14 @@ class Ceil(Transformation): name: str = 'ceil' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from numpy import ceil return ceil(df[self.key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).ceil().alias(alias) @staticmethod @@ -1721,12 +1817,14 @@ class Round(Transformation): name: str = 'round' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from numpy import round return round(df[self.key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).round(0).alias(alias) @staticmethod @@ -1746,12 +1844,14 @@ class Absolute(Transformation): name: str = 'abs' - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from numpy import abs return abs(df[self.key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).abs().alias(alias) @staticmethod @@ -1773,11 +1873,13 @@ class MapArgMax(Transformation): def dtype(self) -> FeatureType: # type: ignore return list(self.column_mappings.values())[0].dtype - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: - pl_df = await self.transform_polars(pl.from_pandas(df).lazy(), 'feature') + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: + pl_df = await self.transform_polars(pl.from_pandas(df).lazy(), 'feature', store) return pl_df.collect().to_pandas()['feature'] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: expr: pl.Expr = pl.lit(None) if len(self.column_mappings) == 1: @@ -1831,10 +1933,12 @@ class WordVectoriser(Transformation): name = 'word_vectoriser' dtype = FeatureType.embedding(768) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return await self.model.vectorise_pandas(df[self.key]) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return await self.model.vectorise_polars(df, self.key, alias) @@ -1846,7 +1950,9 @@ class LoadImageUrl(Transformation): name = 'load_image' dtype = FeatureType.array() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: import asyncio from io import BytesIO @@ -1871,7 +1977,9 @@ class GrayscaleImage(Transformation): name = 'grayscale_image' dtype = FeatureType.array() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: import numpy as np def grayscale(images) -> pl.Series: @@ -1890,10 +1998,12 @@ class Power(Transformation): name = 'power' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] ** self.power.python_value - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).pow(self.power.python_value) @@ -1905,10 +2015,12 @@ class PowerFeature(Transformation): name = 'power_feat' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] ** df[self.power_key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).pow(pl.col(self.power_key)) @@ -1921,10 +2033,12 @@ class AppendConstString(Transformation): name = 'append_const_string' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key] + self.string - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.concat_str([pl.col(self.key).fill_null(''), pl.lit(self.string)], separator='').alias(alias) @@ -1938,10 +2052,12 @@ class AppendStrings(Transformation): name = 'append_strings' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.first_key] + self.sep + df[self.second_key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns( pl.concat_str( [pl.col(self.first_key).fill_null(''), pl.col(self.second_key).fill_null('')], @@ -1959,10 +2075,12 @@ class PrependConstString(Transformation): name = 'prepend_const_string' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return self.string + df[self.key] - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.concat_str([pl.lit(self.string), pl.col(self.key).fill_null('')], separator='').alias(alias) @@ -1975,12 +2093,14 @@ class ConcatStringAggregation(Transformation, PsqlTransformation, RedshiftTransf name = 'concat_string_agg' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: pdf = await self.transform_polars(pl.from_pandas(df).lazy(), self.name) assert isinstance(pdf, pl.LazyFrame) return pdf.collect().to_pandas()[self.name] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return df.with_columns(pl.concat_str(pl.col(self.key), separator=self.separator).alias(alias)) def as_psql(self) -> str: @@ -1998,10 +2118,12 @@ class SumAggregation(Transformation, PsqlTransformation, RedshiftTransformation) name = 'sum_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.sum(self.key) def as_psql(self) -> str: @@ -2016,10 +2138,12 @@ class MeanAggregation(Transformation, PsqlTransformation, RedshiftTransformation name = 'mean_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).mean() def as_psql(self) -> str: @@ -2034,10 +2158,12 @@ class MinAggregation(Transformation, PsqlTransformation, RedshiftTransformation) name = 'min_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).min() def as_psql(self) -> str: @@ -2052,10 +2178,12 @@ class MaxAggregation(Transformation, PsqlTransformation, RedshiftTransformation) name = 'max_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).max() def as_psql(self) -> str: @@ -2070,10 +2198,12 @@ class CountAggregation(Transformation, PsqlTransformation, RedshiftTransformatio name = 'count_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).count() def as_psql(self) -> str: @@ -2088,10 +2218,12 @@ class CountDistinctAggregation(Transformation, PsqlTransformation, RedshiftTrans name = 'count_distinct_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).unique_counts() def as_psql(self) -> str: @@ -2106,10 +2238,12 @@ class StdAggregation(Transformation, PsqlTransformation, RedshiftTransformation) name = 'std_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).std() def as_psql(self) -> str: @@ -2124,10 +2258,12 @@ class VarianceAggregation(Transformation, PsqlTransformation, RedshiftTransforma name = 'var_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).var() def as_psql(self) -> str: @@ -2142,10 +2278,12 @@ class MedianAggregation(Transformation, PsqlTransformation, RedshiftTransformati name = 'median_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).median() def as_psql(self) -> str: @@ -2161,10 +2299,12 @@ class PercentileAggregation(Transformation, PsqlTransformation, RedshiftTransfor name = 'percentile_agg' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: raise NotImplementedError() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).quantile(self.percentile) def as_psql(self) -> str: @@ -2181,10 +2321,12 @@ class Clip(Transformation, PsqlTransformation, RedshiftTransformation): name = 'clip' dtype = FeatureType.floating_point() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].clip(lower=self.lower.python_value, upper=self.upper.python_value) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).clip(lower_bound=self.lower.python_value, upper_bound=self.upper.python_value) def as_psql(self) -> str: @@ -2213,14 +2355,16 @@ class PresignedAwsUrl(Transformation): name = 'presigned_aws_url' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from aioaws.s3 import S3Client from httpx import AsyncClient s3 = S3Client(AsyncClient(), config=self.config.s3_config) return df[self.key].apply(lambda x: s3.signed_download_url(x, max_age=self.max_age_seconds)) # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: from aioaws.s3 import S3Client from httpx import AsyncClient @@ -2242,7 +2386,7 @@ class StructField(Transformation): name = 'struct_field' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: data = pl.from_pandas(df).lazy() tran = await self.transform_polars(data, 'feature') @@ -2251,7 +2395,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return data.select(tran).collect().to_pandas()['feature'] # type: ignore - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: if df.schema[self.key].is_(pl.Utf8): return await JsonPath(self.key, f'$.{self.field}').transform_polars(df, alias) else: @@ -2269,7 +2415,7 @@ class OllamaGenerate(Transformation): name = 'ollama_embedding' dtype = FeatureType.json() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from ollama import AsyncClient import os @@ -2290,7 +2436,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return response - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: def generate_embedding(values: pl.Series) -> pl.Series: from ollama import Client import os @@ -2327,7 +2475,7 @@ class OllamaEmbedding(Transformation): name = 'ollama_embedding' dtype = FeatureType.embedding(768) - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: from ollama import AsyncClient import os @@ -2345,7 +2493,9 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return response - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: def generate_embedding(values: pl.Series) -> pl.Series: from ollama import Client import os @@ -2371,10 +2521,12 @@ class JsonPath(Transformation): name = 'json_path' dtype = FeatureType.string() - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return pl.Series(df[self.key]).str.json_path_match(self.path).to_pandas() - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr: return pl.col(self.key).str.json_path_match(self.path).alias(alias) @@ -2384,8 +2536,10 @@ class Split(Transformation): key: str separator: str - async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: return df[self.key].str.split(self.separator) - async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr | pl.Expr: + async def transform_polars( + self, df: pl.LazyFrame, alias: str, store: ContractStore + ) -> pl.LazyFrame | pl.Expr | pl.Expr: return pl.col(self.key).str.split(self.separator) diff --git a/aligned/sources/in_mem_source.py b/aligned/sources/in_mem_source.py index 039333fd..4ec6bcb2 100644 --- a/aligned/sources/in_mem_source.py +++ b/aligned/sources/in_mem_source.py @@ -7,18 +7,24 @@ from aligned.data_source.batch_data_source import BatchDataSource, CodableBatchDataSource from aligned.feature_source import WritableFeatureSource from aligned.retrival_job import RetrivalJob, RetrivalRequest +from aligned.schemas.feature import Feature +from aligned.sources.vector_index import VectorIndex if TYPE_CHECKING: from aligned.schemas.feature_view import CompiledFeatureView -class InMemorySource(CodableBatchDataSource, DataFileReference, WritableFeatureSource): +class InMemorySource(CodableBatchDataSource, DataFileReference, WritableFeatureSource, VectorIndex): type_name = 'in_mem_source' def __init__(self, data: pl.DataFrame) -> None: self.data = data self.job_key = str(uuid.uuid4()) + self._vector_index_name = None + + def vector_index_name(self) -> str | None: + return self._vector_index_name def job_group_key(self) -> str: return self.job_key @@ -46,7 +52,82 @@ async def overwrite(self, job: RetrivalJob, request: RetrivalRequest) -> None: async def write_polars(self, df: pl.LazyFrame) -> None: self.data = df.collect() + def nearest_n_to( + self, data: RetrivalJob, number_of_records: int, request: RetrivalRequest + ) -> RetrivalJob: + from aligned.retrival_job import RetrivalJob + + print(request.features_to_include) + + async def load() -> pl.LazyFrame: + def first_embedding(features: set[Feature]) -> Feature | None: + for feature in features: + if feature.dtype.is_embedding: + return feature + return None + + embedding = first_embedding(data.request_result.features) + assert embedding, 'Expected to a least find one embedding in the input data' + + df = await data.to_polars() + + def cosine_similarity(vector, candidate): + import numpy as np + + vec1 = vector + vec2 = np.array(candidate) + + dot_product = np.dot(vec1, vec2) + norm_vec1 = np.linalg.norm(vec1) + norm_vec2 = np.linalg.norm(vec2) + + return dot_product / (norm_vec1 * norm_vec2) + + result: pl.DataFrame | None = None + + org_columns = df.columns + df_cols = len(df.columns) + + distance_key = 'distance' + + for item in df.iter_rows(named=True): + most_similar = ( + self.data.with_columns( + pl.col(embedding.name) + .map_elements( + lambda candidate: cosine_similarity(item[embedding.name], candidate), + ) + .alias(distance_key) + ) + .sort(distance_key, descending=True) + .head(number_of_records) + .select(pl.exclude(distance_key)) + ) + + if df_cols > 1: + most_similar = most_similar.select(pl.exclude(org_columns)).hstack( + pl.DataFrame([item] * most_similar.height) + .select(org_columns) + .select(pl.exclude(embedding.name)) + ) + + if result is None: + result = most_similar + else: + result = result.vstack(most_similar) + + if result is None: + return pl.DataFrame().lazy() + else: + return result.lazy() + + return RetrivalJob.from_lazy_function(load, request) + def with_view(self, view: 'CompiledFeatureView') -> 'InMemorySource': + + if self._vector_index_name is None: + self._vector_index_name = view.name + if self.data.is_empty(): return InMemorySource.from_values({feat.name: [] for feat in view.features}) return self diff --git a/pyproject.toml b/pyproject.toml index 4a3e456c..eadd9d3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.106" +version = "0.0.107" description = "A data managment and lineage tool for ML applications." authors = ["Mats E. Mollestad "] license = "Apache-2.0"