From d6ecfa425136fab07826c01d0a7ac271f7a37a30 Mon Sep 17 00:00:00 2001 From: AlvaroMarquesAndrade <45604858+AlvaroMarquesAndrade@users.noreply.github.com> Date: Thu, 18 Feb 2021 15:14:44 -0300 Subject: [PATCH] [MLOP-636] Create migration classes (#282) --- butterfree/migrations/__init__.py | 7 +++ butterfree/migrations/cassandra_migration.py | 23 ++++++++ butterfree/migrations/metastore_migration.py | 23 ++++++++ butterfree/migrations/migration.py | 62 ++++++++++++++++++++ 4 files changed, 115 insertions(+) create mode 100644 butterfree/migrations/__init__.py create mode 100644 butterfree/migrations/cassandra_migration.py create mode 100644 butterfree/migrations/metastore_migration.py create mode 100644 butterfree/migrations/migration.py diff --git a/butterfree/migrations/__init__.py b/butterfree/migrations/__init__.py new file mode 100644 index 00000000..5f709bfe --- /dev/null +++ b/butterfree/migrations/__init__.py @@ -0,0 +1,7 @@ +"""Holds available migrations.""" + +from butterfree.migrations.cassandra_migration import CassandraMigration +from butterfree.migrations.metastore_migration import MetastoreMigration +from butterfree.migrations.migration import DatabaseMigration + +__all__ = ["DatabaseMigration", "CassandraMigration", "MetastoreMigration"] diff --git a/butterfree/migrations/cassandra_migration.py b/butterfree/migrations/cassandra_migration.py new file mode 100644 index 00000000..e9cecdc7 --- /dev/null +++ b/butterfree/migrations/cassandra_migration.py @@ -0,0 +1,23 @@ +"""Cassandra Migration entity.""" + +from typing import Any, Dict, List + +from butterfree.migrations import DatabaseMigration + + +class CassandraMigration(DatabaseMigration): + """Cassandra class for Migrations.""" + + def create_query( + self, + fs_schema: List[Dict[str, Any]], + db_schema: List[Dict[str, Any]], + table_name: str, + ) -> Any: + """Create a query regarding Cassandra. + + Returns: + Schema object. + + """ + pass diff --git a/butterfree/migrations/metastore_migration.py b/butterfree/migrations/metastore_migration.py new file mode 100644 index 00000000..bb208f2a --- /dev/null +++ b/butterfree/migrations/metastore_migration.py @@ -0,0 +1,23 @@ +"""Metastore Migration entity.""" + +from typing import Any, Dict, List + +from butterfree.migrations import DatabaseMigration + + +class MetastoreMigration(DatabaseMigration): + """Metastore class for Migrations.""" + + def create_query( + self, + fs_schema: List[Dict[str, Any]], + db_schema: List[Dict[str, Any]], + table_name: str, + ) -> Any: + """Create a query regarding Metastore. + + Returns: + Schema object. + + """ + pass diff --git a/butterfree/migrations/migration.py b/butterfree/migrations/migration.py new file mode 100644 index 00000000..c53945bf --- /dev/null +++ b/butterfree/migrations/migration.py @@ -0,0 +1,62 @@ +"""Migration entity.""" + +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, List + +from butterfree.pipelines import FeatureSetPipeline + + +class DatabaseMigration(ABC): + """Abstract base class for Migrations.""" + + @abstractmethod + def create_query( + self, + fs_schema: List[Dict[str, Any]], + db_schema: List[Dict[str, Any]], + table_name: str, + ) -> Any: + """Create a query regarding a data source. + + Returns: + The desired query for the given database. + + """ + + def _validate_schema( + self, fs_schema: List[Dict[str, Any]], db_schema: List[Dict[str, Any]] + ) -> Any: + """Provides schema validation for feature sets. + + Compares the schema of your local feature set to the + corresponding table in a given database. + + Args: + fs_schema: object that contains feature set's schemas. + db_schema: object that contains the table og a given db schema. + + """ + + def _get_schema(self, db_client: Callable, table_name: str) -> List[Dict[str, Any]]: + """Get a table schema in the respective database. + + Returns: + Schema object. + """ + pass + + def _apply_migration(self, query: str, db_client: Callable) -> None: + """Apply the migration in the respective database.""" + + def _send_logs_to_s3(self) -> None: + """Send all migration logs to S3.""" + pass + + def run(self, pipelines: List[FeatureSetPipeline]) -> None: + """Runs the migrations. + + Args: + pipelines: the feature set pipelines. + + """ + pass