From 5967722b0dbf529ca5e361a1e97346a71ccb6aed Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 23 Jun 2024 01:59:50 +0200 Subject: [PATCH] Dask interface: Accept and forward the new `if-exists` query param The new `if-exists` query parameter will be forwarded to Dask's `to_sql()` method. By default, `influxio` will use `fail`. When targeting the SQLAlchemy database interface, the target table will be created automatically, if it does not exist. The `if-exists` URL query parameter can be used to configure this behavior. The default value is `fail`. * fail: Raise a ValueError. * replace: Drop the table before inserting new values. * append: Insert new values to the existing table. --- CHANGES.rst | 2 + README.rst | 23 ++++++++++ examples/export_sqlalchemy.py | 2 +- influxio/adapter.py | 17 +++++--- influxio/io.py | 10 ++++- tests/test_adapter.py | 9 ++++ tests/test_export.py | 80 +++++++++++++++++++++++++++++++++-- 7 files changed, 133 insertions(+), 10 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 232d1e5..e09b958 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,8 @@ Changelog in progress =========== +- Dask interface: Accept and forward the new ``if-exists`` URL query + parameter to Dask's ``to_sql()`` method. 2024-06-13 v0.3.1 ================= diff --git a/README.rst b/README.rst index 2519801..0da9836 100644 --- a/README.rst +++ b/README.rst @@ -287,6 +287,29 @@ keystrokes on subsequent invocations. influxio copy "${SOURCE}" "${TARGET}" +Parameters +========== + +``if-exists`` +------------- +When targeting the SQLAlchemy database interface, the target table will be +created automatically, if it does not exist. The ``if-exists`` URL query +parameter can be used to configure this behavior. The default value is +``fail``. + +* fail: Raise a ValueError. +* replace: Drop the table before inserting new values. +* append: Insert new values to the existing table. + +Example usage: + +.. code-block:: shell + + influxio copy \ + "http://example:token@localhost:8086/testdrive/demo" \ + "crate://crate@localhost:4200/testdrive?table=demo&if-exists=replace" + + ******************* Project information ******************* diff --git a/examples/export_sqlalchemy.py b/examples/export_sqlalchemy.py index 597f16f..98b91c9 100644 --- a/examples/export_sqlalchemy.py +++ b/examples/export_sqlalchemy.py @@ -41,7 +41,7 @@ def main(): logger.info("Transferring data") for df in influx.read_df(): logger.info("Loading data frame into RDBMS/SQL database using pandas/Dask") - dataframe_to_sql(df, dburi=DBURI, tablename="demo", progress=True) + dataframe_to_sql(df, dburi=DBURI, tablename="demo", if_exists="replace", progress=True) # Read back data from target database. logger.info("Reading back data from the target database") diff --git a/influxio/adapter.py b/influxio/adapter.py index a58d126..a956a82 100644 --- a/influxio/adapter.py +++ b/influxio/adapter.py @@ -271,7 +271,8 @@ def __init__(self, url: t.Union[URL, str], progress: bool = False, debug: bool = if isinstance(url, str): url: URL = URL(url) - self.database, self.table = SqlAlchemyAdapter.decode_database_table(url) + self.database, self.table = self.decode_database_table(url) + self.if_exists = url.query.get("if-exists") # Special handling for SQLite and CrateDB databases. self.dburi = str(url.with_query(None)) @@ -301,9 +302,13 @@ def write(self, source: t.Union[pd.DataFrame, InfluxDbApiAdapter]): logger.info("Loading dataframes into RDBMS/SQL database using pandas/Dask") if isinstance(source, InfluxDbApiAdapter): for df in source.read_df(): - dataframe_to_sql(df, dburi=self.dburi, tablename=self.table, progress=self.progress) + dataframe_to_sql( + df, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress + ) elif isinstance(source, pd.DataFrame): - dataframe_to_sql(source, dburi=self.dburi, tablename=self.table, progress=self.progress) + dataframe_to_sql( + source, dburi=self.dburi, tablename=self.table, if_exists=self.if_exists, progress=self.progress + ) else: raise NotImplementedError(f"Failed handling source: {source}") @@ -329,13 +334,15 @@ def create_database(self): def run_sql(self, sql: str): engine = sa.create_engine(self.dburi) with engine.connect() as connection: - connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + if hasattr(connection.connection, "set_isolation_level"): + connection.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) return connection.execute(sa.text(sql)) def run_sql_raw(self, sql: str): engine = sa.create_engine(self.dburi) connection = engine.raw_connection() - connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + if hasattr(connection, "set_isolation_level"): + connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() cursor.execute(sql) result = cursor.fetchall() diff --git a/influxio/io.py b/influxio/io.py index b108dea..79c60eb 100644 --- a/influxio/io.py +++ b/influxio/io.py @@ -97,7 +97,7 @@ def dataframe_to_sql( tablename: str, index=False, chunksize=None, - if_exists="replace", + if_exists="fail", npartitions: int = None, progress: bool = False, ): @@ -105,10 +105,18 @@ def dataframe_to_sql( Load pandas dataframe into database using Dask. https://stackoverflow.com/questions/62404502/using-dasks-new-to-sql-for-improved-efficiency-memory-speed-or-alternative-to + + if_exists : {'fail', 'replace', 'append'}, default 'fail' + How to behave if the table already exists. + + * fail: Raise a ValueError. + * replace: Drop the table before inserting new values. + * append: Insert new values to the existing table. """ import dask.dataframe as dd # Set a few defaults. + if_exists = if_exists or "fail" chunksize = chunksize or 5_000 npartitions = npartitions or int(os.cpu_count() / 2) diff --git a/tests/test_adapter.py b/tests/test_adapter.py index 39f2a23..d87d5e6 100644 --- a/tests/test_adapter.py +++ b/tests/test_adapter.py @@ -42,6 +42,15 @@ def test_cratedb_adapter_database_table(): assert adapter.database == "testdrive" assert adapter.table == "basic" assert adapter.dburi == "crate://localhost:4200/?schema=testdrive" + assert adapter.if_exists is None + + +def test_cratedb_adapter_if_exists(): + adapter = SqlAlchemyAdapter.from_url("crate://localhost:4200/?database=testdrive&table=basic&if-exists=append") + assert adapter.database == "testdrive" + assert adapter.table == "basic" + assert adapter.dburi == "crate://localhost:4200/?schema=testdrive" + assert adapter.if_exists == "append" def test_file_adapter_ilp_file(): diff --git a/tests/test_export.py b/tests/test_export.py index 6a9377b..b13af2b 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -21,7 +21,9 @@ def influxdb() -> InfluxDbApiAdapter: @pytest.fixture def cratedb() -> SqlAlchemyAdapter: - return SqlAlchemyAdapter.from_url(CRATEDB_URL) + adapter = SqlAlchemyAdapter.from_url(CRATEDB_URL) + adapter.run_sql("DROP TABLE IF EXISTS basic") + return adapter @pytest.fixture @@ -63,9 +65,9 @@ def provision_influxdb(influxdb, line_protocol_file_basic): influxio.core.copy(source_url, target_url) -def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb): +def test_export_cratedb_default(caplog, influxdb, provision_influxdb, cratedb): """ - Export data from InfluxDB to CrateDB. + Export data from InfluxDB to CrateDB, happy path. """ source_url = INFLUXDB_API_URL @@ -84,6 +86,78 @@ def test_export_cratedb(caplog, influxdb, provision_influxdb, cratedb): assert len(records) == 2 +def test_export_cratedb_fail_if_target_exists(caplog, influxdb, provision_influxdb, cratedb): + """ + Exporting data from InfluxDB to CrateDB should fail if target table exists. + """ + + source_url = INFLUXDB_API_URL + target_url = CRATEDB_URL + + # Create a table that will cause the export process to fail. + cratedb.run_sql("CREATE TABLE basic (foo INT)") + + # Transfer data. + with pytest.raises(ValueError) as ex: + influxio.core.copy(source_url, target_url) + ex.match("Table 'basic' already exists.") + + +def test_export_cratedb_if_exists_unknown(caplog, influxdb, provision_influxdb, cratedb): + """ + Exporting data from InfluxDB to CrateDB should fail if target table exists. + """ + + source_url = INFLUXDB_API_URL + target_url = CRATEDB_URL + "?if-exists=Hotzenplotz" + + # Create a table that will cause the export process to fail. + cratedb.run_sql("CREATE TABLE basic (foo INT)") + + # Transfer data. + with pytest.raises(ValueError) as ex: + influxio.core.copy(source_url, target_url) + ex.match("'Hotzenplotz' is not valid for if_exists") + + +def test_export_cratedb_if_exists_replace(caplog, influxdb, provision_influxdb, cratedb): + """ + Exporting data from InfluxDB to CrateDB will succeed with ``if-exists=replace``. + """ + + source_url = INFLUXDB_API_URL + target_url = CRATEDB_URL + "?if-exists=replace" + + # Create a table that would cause the export process to fail. + cratedb.run_sql("CREATE TABLE basic (foo INT)") + + # Transfer data. + influxio.core.copy(source_url, target_url) + + # Verify number of records in target database. + cratedb.refresh_table() + records = cratedb.read_records() + assert len(records) == 2 + + +def test_export_cratedb_if_exists_append(caplog, influxdb, provision_influxdb, cratedb): + """ + Exporting data from InfluxDB to CrateDB twice will succeed with ``if-exists=append``. + """ + + source_url = INFLUXDB_API_URL + target_url = CRATEDB_URL + "?if-exists=append" + + # Transfer data. + influxio.core.copy(source_url, target_url) + influxio.core.copy(source_url, target_url) + + # Verify number of records in target database. + cratedb.refresh_table() + records = cratedb.read_records() + assert len(records) == 4 + + def test_export_postgresql(caplog, influxdb, provision_influxdb, postgresql): """ Export data from InfluxDB to PostgreSQL.