diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d3e7ba5..26d5f80d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each ## [Unreleased] +## [1.4.5](https://github.com/quintoandar/butterfree/releases/tag/1.4.5) +* Rollback repartitions ([#386](https://github.com/quintoandar/butterfree/pull/386)) +* Add protection to host setting on cassandra_client ([#385](https://github.com/quintoandar/butterfree/pull/385)) + ## [1.4.4](https://github.com/quintoandar/butterfree/releases/tag/1.4.4) * Fix Storage Level ([#382](https://github.com/quintoandar/butterfree/pull/382)) diff --git a/Makefile b/Makefile index a93104ab..2156f9b0 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,7 @@ style-check: @echo "Code Style" @echo "==========" @echo "" - @python -m black --check -t py39 --exclude="build/|buck-out/|dist/|_build/|pip/|\.pip/|\.git/|\.hg/|\.mypy_cache/|\.tox/|\.venv/" . && echo "\n\nSuccess" || (echo "\n\nFailure\n\nYou need to run \"make apply-style\" to apply style formatting to your code"; exit 1) + @python -m black --check -t py39 --exclude="build/|buck-out/|dist/|_build/|pip/|\.pip/|\.git/|\.hg/|\.mypy_cache/|\.tox/|\.venv/|venv/" . && echo "\n\nSuccess" || (echo "\n\nFailure\n\nYou need to run \"make apply-style\" to apply style formatting to your code"; exit 1) .PHONY: quality-check ## run code quality checks with flake8 @@ -85,7 +85,7 @@ quality-check: @echo "Flake 8" @echo "=======" @echo "" - @python -m flake8 && echo "Success" + python -m flake8 --exclude="dist,build,pip,.pip,deps,.venv,venv,.git,.hg,.mypy_cache,.tox" && echo "Success" @echo "" .PHONY: type-check @@ -95,7 +95,7 @@ type-check: @echo "mypy" @echo "====" @echo "" - @python -m mypy butterfree + @python -m mypy --exclude="build/|buck-out/|dist/|_build/|pip/|\.pip/|\.git/|\.hg/|\.mypy_cache/|\.tox/|\.venv/|venv/" butterfree .PHONY: checks ## run all code checks @@ -104,7 +104,7 @@ checks: style-check quality-check type-check .PHONY: apply-style ## fix stylistic errors with black apply-style: - @python -m black -t py39 --exclude="build/|buck-out/|dist/|_build/|pip/|\.pip/|\.git/|\.hg/|\.mypy_cache/|\.tox/|\.venv/" . + @python -m black -t py39 --exclude="build/|buck-out/|dist/|_build/|pip/|\.pip/|\.git/|\.hg/|\.mypy_cache/|\.tox/|\.venv/|venv/" . @python -m isort --atomic butterfree/ tests/ .PHONY: clean diff --git a/butterfree/clients/cassandra_client.py b/butterfree/clients/cassandra_client.py index 714e8248..0b300844 100644 --- a/butterfree/clients/cassandra_client.py +++ b/butterfree/clients/cassandra_client.py @@ -1,7 +1,7 @@ """CassandraClient entity.""" from ssl import CERT_REQUIRED, PROTOCOL_TLSv1 -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import ( @@ -16,6 +16,12 @@ from typing_extensions import TypedDict from butterfree.clients import AbstractClient +from butterfree.configs.logger import __logger + +logger = __logger("cassandra_client") + +EMPTY_STRING_HOST_ERROR = "The value of Cassandra host is empty. Please fill correctly with your endpoints" # noqa: E501 +GENERIC_INVALID_HOST_ERROR = "The Cassandra host must be a valid string, a string that represents a list or list of strings" # noqa: E501 class CassandraColumn(TypedDict): @@ -53,12 +59,48 @@ def __init__( user: Optional[str] = None, password: Optional[str] = None, ) -> None: - self.host = host + self.host = self._validate_and_format_cassandra_host(host) + logger.info(f"The host setted is {self.host}") self.keyspace = keyspace self.user = user self.password = password self._session: Optional[Session] = None + def _validate_and_format_cassandra_host(self, host: Union[List, str]): + """ + Validate and format the provided Cassandra host input. + + This method checks if the input `host` is either a string, a list of strings, or + a list containing a single string with comma-separated values. It splits the string + by commas and trims whitespace, returning a list of hosts. If the input is already + a list of strings, it returns that list. If the input is empty or invalid, a + ValueError is raised. + + Args: + host (str | list): The Cassandra host input, which can be a comma-separated + string or a list of string endpoints. + + Returns: + list: A list of formatted Cassandra host strings. + + Raises: + ValueError: If the input is an empty list/string or if it is not a string + (or a representation of a list) or a list of strings. + """ # noqa: E501 + if isinstance(host, str): + if host: + return [item.strip() for item in host.split(",")] + else: + raise ValueError(EMPTY_STRING_HOST_ERROR) + + if isinstance(host, list): + if len(host) == 1 and isinstance(host[0], str): + return [item.strip() for item in host[0].split(",")] + elif all(isinstance(item, str) for item in host): + return host + + raise ValueError(GENERIC_INVALID_HOST_ERROR) + @property def conn(self, *, ssl_path: str = None) -> Session: # type: ignore """Establishes a Cassandra connection.""" diff --git a/butterfree/pipelines/feature_set_pipeline.py b/butterfree/pipelines/feature_set_pipeline.py index cda233f7..c33f3bb9 100644 --- a/butterfree/pipelines/feature_set_pipeline.py +++ b/butterfree/pipelines/feature_set_pipeline.py @@ -237,7 +237,9 @@ def run( num_processors=num_processors, ) - if dataframe.storageLevel != StorageLevel(False, False, False, False, 1): + if transformed_dataframe.storageLevel != StorageLevel( + False, False, False, False, 1 + ): dataframe.unpersist() # Clear the data from the cache (disk and memory) # Step 4: Load the data into the configured sink. diff --git a/docs/source/butterfree.dataframe_service.rst b/docs/source/butterfree.dataframe_service.rst index faf9cf54..4fb54fd3 100644 --- a/docs/source/butterfree.dataframe_service.rst +++ b/docs/source/butterfree.dataframe_service.rst @@ -23,11 +23,22 @@ butterfree.dataframe\_service.partitioning module butterfree.dataframe\_service.repartition module ------------------------------------------------ -.. automodule:: butterfree.dataframe_service.repartition +.. automodule:: butterfree.dataframe_service.incremental_strategy :members: :undoc-members: :show-inheritance: +butterfree.dataframe\_service.partitioning module +------------------------------------------------- + +.. automodule:: butterfree.dataframe_service.partitioning + :members: + :undoc-members: + :show-inheritance: + +butterfree.dataframe\_service.repartition module +------------------------------------------------ + .. automodule:: butterfree.dataframe_service.repartition :members: :undoc-members: diff --git a/setup.py b/setup.py index 5b7ea095..8ff386d9 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.4.4" +__version__ = "1.4.5" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: diff --git a/tests/unit/butterfree/clients/test_cassandra_client.py b/tests/unit/butterfree/clients/test_cassandra_client.py index 0356e43f..9f634cf7 100644 --- a/tests/unit/butterfree/clients/test_cassandra_client.py +++ b/tests/unit/butterfree/clients/test_cassandra_client.py @@ -1,8 +1,14 @@ from typing import Any, Dict, List from unittest.mock import MagicMock +import pytest + from butterfree.clients import CassandraClient -from butterfree.clients.cassandra_client import CassandraColumn +from butterfree.clients.cassandra_client import ( + EMPTY_STRING_HOST_ERROR, + GENERIC_INVALID_HOST_ERROR, + CassandraColumn, +) def sanitize_string(query: str) -> str: @@ -86,3 +92,47 @@ def test_cassandra_create_table( query = cassandra_client.sql.call_args[0][0] assert sanitize_string(query) == sanitize_string(expected_query) + + def test_initialize_with_string_host(self): + client = CassandraClient(host="127.0.0.0, 127.0.0.1", keyspace="dummy_keyspace") + assert client.host == ["127.0.0.0", "127.0.0.1"] + + def test_initialize_with_list_host(self): + client = CassandraClient( + host=["127.0.0.0", "127.0.0.1"], keyspace="test_keyspace" + ) + assert client.host == ["127.0.0.0", "127.0.0.1"] + + def test_initialize_with_empty_string_host(self): + with pytest.raises( + ValueError, + match=EMPTY_STRING_HOST_ERROR, + ): + CassandraClient(host="", keyspace="test_keyspace") + + def test_initialize_with_none_host(self): + with pytest.raises( + ValueError, + match=GENERIC_INVALID_HOST_ERROR, + ): + CassandraClient(host=None, keyspace="test_keyspace") + + def test_initialize_with_invalid_host_type(self): + with pytest.raises( + ValueError, + match=GENERIC_INVALID_HOST_ERROR, + ): + CassandraClient(host=123, keyspace="test_keyspace") + + def test_initialize_with_invalid_list_host(self): + with pytest.raises( + ValueError, + match=GENERIC_INVALID_HOST_ERROR, + ): + CassandraClient(host=["127.0.0.0", 123], keyspace="test_keyspace") + + def test_initialize_with_list_of_string_hosts(self): + client = CassandraClient( + host=["127.0.0.0, 127.0.0.1"], keyspace="test_keyspace" + ) + assert client.host == ["127.0.0.0", "127.0.0.1"]