Skip to content

Commit

Permalink
[MLOP-635] Rebase Incremental Job/Interval Run branch for test on sel…
Browse files Browse the repository at this point in the history
…ected feature sets (#278)

* Add interval branch modifications.

* Add interval_runs notebook.

* Add tests.

* Apply style (black, flack8 and mypy).

* Fix tests.

* Change version to create package dev.
  • Loading branch information
moromimay authored Feb 19, 2021
1 parent d6ecfa4 commit 32e24d6
Show file tree
Hide file tree
Showing 58 changed files with 4,738 additions and 389 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ coverage.xml
*.cover
.hypothesis/
*cov.xml
test_folder/

# Translations
*.mo
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]
### Added
* [MLOP-636] Create migration classes ([#282](https://github.com/quintoandar/butterfree/pull/282))

## [1.1.3](https://github.com/quintoandar/butterfree/releases/tag/1.1.3)
### Added
* [MLOP-599] Apply mypy to ButterFree ([#273](https://github.com/quintoandar/butterfree/pull/273))

### Changed
* [MLOP-634] Butterfree dev workflow, set triggers for branches staging and master ([#280](https://github.com/quintoandar/butterfree/pull/280))
* Keep milliseconds when using 'from_ms' argument in timestamp feature ([#284](https://github.com/quintoandar/butterfree/pull/284))
* [MLOP-633] Butterfree dev workflow, update documentation ([#281](https://github.com/quintoandar/butterfree/commit/74278986a49f1825beee0fd8df65a585764e5524))
* [MLOP-632] Butterfree dev workflow, automate release description ([#279](https://github.com/quintoandar/butterfree/commit/245eaa594846166972241b03fddc61ee5117b1f7))

### Fixed
* Change trigger for pipeline staging ([#287](https://github.com/quintoandar/butterfree/pull/287))
Expand Down
41 changes: 19 additions & 22 deletions butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,31 @@ class CassandraClient(AbstractClient):
"""Cassandra Client.
Attributes:
cassandra_user: username to use in connection.
cassandra_password: password to use in connection.
cassandra_key_space: key space used in connection.
cassandra_host: cassandra endpoint used in connection.
user: username to use in connection.
password: password to use in connection.
keyspace: key space used in connection.
host: cassandra endpoint used in connection.
"""

def __init__(
self,
cassandra_host: List[str],
cassandra_key_space: str,
cassandra_user: Optional[str] = None,
cassandra_password: Optional[str] = None,
host: List[str],
keyspace: str,
user: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self.cassandra_host = cassandra_host
self.cassandra_key_space = cassandra_key_space
self.cassandra_user = cassandra_user
self.cassandra_password = cassandra_password
self.host = host
self.keyspace = keyspace
self.user = user
self.password = password
self._session: Optional[Session] = None

@property
def conn(self, *, ssl_path: str = None) -> Session: # type: ignore
"""Establishes a Cassandra connection."""
auth_provider = (
PlainTextAuthProvider(
username=self.cassandra_user, password=self.cassandra_password
)
if self.cassandra_user is not None
PlainTextAuthProvider(username=self.user, password=self.password)
if self.user is not None
else None
)
ssl_opts = (
Expand All @@ -73,12 +71,12 @@ def conn(self, *, ssl_path: str = None) -> Session: # type: ignore
)

cluster = Cluster(
contact_points=self.cassandra_host,
contact_points=self.host,
auth_provider=auth_provider,
ssl_options=ssl_opts,
load_balancing_policy=RoundRobinPolicy(),
)
self._session = cluster.connect(self.cassandra_key_space)
self._session = cluster.connect(self.keyspace)
self._session.row_factory = dict_factory
return self._session

Expand Down Expand Up @@ -106,16 +104,15 @@ def get_schema(self, table: str) -> List[Dict[str, str]]:
"""
query = (
f"SELECT column_name, type FROM system_schema.columns " # noqa
f"WHERE keyspace_name = '{self.cassandra_key_space}' " # noqa
f"WHERE keyspace_name = '{self.keyspace}' " # noqa
f" AND table_name = '{table}';" # noqa
)

response = list(self.sql(query))

if not response:
raise RuntimeError(
f"No columns found for table: {table}"
f"in key space: {self.cassandra_key_space}"
f"No columns found for table: {table}" f"in key space: {self.keyspace}"
)

return response
Expand Down Expand Up @@ -143,7 +140,7 @@ def _get_create_table_query(
else:
columns_str = joined_parsed_columns

query = f"CREATE TABLE {self.cassandra_key_space}.{table} " f"({columns_str}); "
query = f"CREATE TABLE {self.keyspace}.{table} " f"({columns_str}); "

return query

Expand Down
58 changes: 53 additions & 5 deletions butterfree/clients/spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def conn(self) -> SparkSession:
def read(
self,
format: str,
options: Dict[str, Any],
path: Optional[Union[str, List[str]]] = None,
schema: Optional[StructType] = None,
stream: bool = False,
**options: Any,
) -> DataFrame:
"""Use the SparkSession.read interface to load data into a dataframe.
Expand All @@ -45,24 +46,27 @@ def read(
Args:
format: string with the format to be used by the DataframeReader.
options: options to setup the DataframeReader.
path: optional string or a list of string for file-system.
stream: flag to indicate if data must be read in stream mode.
schema: an optional pyspark.sql.types.StructType for the input schema.
options: options to setup the DataframeReader.
Returns:
Dataframe
"""
if not isinstance(format, str):
raise ValueError("format needs to be a string with the desired read format")
if not isinstance(options, dict):
raise ValueError("options needs to be a dict with the setup configurations")
if not isinstance(path, (str, list)):
raise ValueError("path needs to be a string or a list of string")

df_reader: Union[
DataStreamReader, DataFrameReader
] = self.conn.readStream if stream else self.conn.read

df_reader = df_reader.schema(schema) if schema else df_reader
return df_reader.format(format).options(**options).load()

return df_reader.format(format).load(path, **options) # type: ignore

def read_table(self, table: str, database: str = None) -> DataFrame:
"""Use the SparkSession.read interface to read a metastore table.
Expand Down Expand Up @@ -223,3 +227,47 @@ def create_temporary_view(self, dataframe: DataFrame, name: str) -> Any:
if not dataframe.isStreaming:
return dataframe.createOrReplaceTempView(name)
return dataframe.writeStream.format("memory").queryName(name).start()

def add_table_partitions(
self, partitions: List[Dict[str, Any]], table: str, database: str = None
) -> None:
"""Add partitions to an existing table.
Args:
partitions: partitions to add to the table.
It's expected a list of partition dicts to add to the table.
Example: `[{"year": 2020, "month": 8, "day": 14}, ...]`
table: table to add the partitions.
database: name of the database where the table is saved.
"""
for partition_dict in partitions:
if not all(
(
isinstance(key, str)
and (isinstance(value, str) or isinstance(value, int))
)
for key, value in partition_dict.items()
):
raise ValueError(
"Partition keys must be column names "
"and values must be string or int."
)

database_expr = f"`{database}`." if database else ""
key_values_expr = [
", ".join(
[
"{} = {}".format(k, v)
if not isinstance(v, str)
else "{} = '{}'".format(k, v)
for k, v in partition.items()
]
)
for partition in partitions
]
partitions_expr = " ".join(f"PARTITION ( {expr} )" for expr in key_values_expr)
command = (
f"ALTER TABLE {database_expr}`{table}` ADD IF NOT EXISTS {partitions_expr}"
)

self.conn.sql(command)
28 changes: 28 additions & 0 deletions butterfree/configs/db/metastore_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import os
from typing import Any, Dict, List, Optional

from pyspark.sql import DataFrame

from butterfree.configs import environment
from butterfree.configs.db import AbstractWriteConfig
from butterfree.dataframe_service import extract_partition_values


class MetastoreConfig(AbstractWriteConfig):
Expand Down Expand Up @@ -87,6 +90,31 @@ def get_options(self, key: str) -> Dict[Optional[str], Optional[str]]:
"path": os.path.join(f"{self.file_system}://{self.path}/", key),
}

def get_path_with_partitions(self, key: str, dataframe: DataFrame) -> List:
"""Get options for AWS S3 from partitioned parquet file.
Options will be a dictionary with the write and read configuration for
Spark to AWS S3.
Args:
key: path to save data into AWS S3 bucket.
dataframe: spark dataframe containing data from a feature set.
Returns:
A list of string for file-system backed data sources.
"""
path_list = []
dataframe_values = extract_partition_values(
dataframe, partition_columns=["year", "month", "day"]
)
for row in dataframe_values:
path_list.append(
f"{self.file_system}://{self.path}/{key}/year={row['year']}/"
f"month={row['month']}/day={row['day']}"
)

return path_list

def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Translate feature set spark schema to the corresponding database."""
pass
4 changes: 2 additions & 2 deletions butterfree/configs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def get_variable(variable_name: str, default_value: str = None) -> Optional[str]
"""Gets an environment variable.
The variable comes from it's explicitly declared value in the running
environment or from the default value declared in the environment.yaml
specification or from the default_value.
environment or from the default value declared in specification or from the
default_value.
Args:
variable_name: environment variable name.
Expand Down
16 changes: 16 additions & 0 deletions butterfree/constants/window_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Allowed windows units and lengths in seconds."""

ALLOWED_WINDOWS = {
"second": 1,
"seconds": 1,
"minute": 60,
"minutes": 60,
"hour": 3600,
"hours": 3600,
"day": 86400,
"days": 86400,
"week": 604800,
"weeks": 604800,
"year": 29030400,
"years": 29030400,
}
9 changes: 8 additions & 1 deletion butterfree/dataframe_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""Dataframe optimization components regarding Butterfree."""
from butterfree.dataframe_service.incremental_strategy import IncrementalStrategy
from butterfree.dataframe_service.partitioning import extract_partition_values
from butterfree.dataframe_service.repartition import repartition_df, repartition_sort_df

__all__ = ["repartition_df", "repartition_sort_df"]
__all__ = [
"extract_partition_values",
"IncrementalStrategy",
"repartition_df",
"repartition_sort_df",
]
Loading

0 comments on commit 32e24d6

Please sign in to comment.