From 6fdb275da3cf5f9800f70c723517394b87867a10 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 6 Feb 2024 05:14:37 +0100 Subject: [PATCH] Dataframes: Refactor examples about pandas and Dask --- .github/dependabot.yml | 10 ++ .github/workflows/df-dask.yml | 74 +++++++++++++ .github/workflows/df-pandas.yml | 74 +++++++++++++ ...t-python-sqlalchemy.yml => sqlalchemy.yml} | 8 +- .gitignore | 2 +- by-dataframe/dask/README.md | 101 ++++++++++++++++++ by-dataframe/dask/insert_dask.py | 73 +++++++++++++ by-dataframe/dask/pyproject.toml | 24 +++++ by-dataframe/dask/requirements-dev.txt | 2 + by-dataframe/dask/requirements.txt | 5 + by-dataframe/dask/test.py | 59 ++++++++++ by-dataframe/pandas/README.md | 101 ++++++++++++++++++ .../pandas}/insert_pandas.py | 22 ++-- by-dataframe/pandas/pyproject.toml | 24 +++++ by-dataframe/pandas/requirements-dev.txt | 2 + by-dataframe/pandas/requirements.txt | 5 + by-dataframe/pandas/test.py | 58 ++++++++++ by-language/python-sqlalchemy/README.rst | 73 ++----------- by-language/python-sqlalchemy/insert_dask.py | 46 -------- .../python-sqlalchemy/requirements-dev.txt | 1 - .../python-sqlalchemy/requirements.txt | 3 +- by-language/python-sqlalchemy/test.py | 10 -- 22 files changed, 634 insertions(+), 143 deletions(-) create mode 100644 .github/workflows/df-dask.yml create mode 100644 .github/workflows/df-pandas.yml rename .github/workflows/{test-python-sqlalchemy.yml => sqlalchemy.yml} (92%) create mode 100644 by-dataframe/dask/README.md create mode 100644 by-dataframe/dask/insert_dask.py create mode 100644 by-dataframe/dask/pyproject.toml create mode 100644 by-dataframe/dask/requirements-dev.txt create mode 100644 by-dataframe/dask/requirements.txt create mode 100644 by-dataframe/dask/test.py create mode 100644 by-dataframe/pandas/README.md rename {by-language/python-sqlalchemy => by-dataframe/pandas}/insert_pandas.py (91%) create mode 100644 by-dataframe/pandas/pyproject.toml create mode 100644 by-dataframe/pandas/requirements-dev.txt create mode 100644 by-dataframe/pandas/requirements.txt create mode 100644 by-dataframe/pandas/test.py delete mode 100644 by-language/python-sqlalchemy/insert_dask.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 806214e0..3ba6eaf8 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -23,6 +23,16 @@ updates: # Languages. + - directory: "/by-dataframe/dask" + package-ecosystem: "pip" + schedule: + interval: "weekly" + + - directory: "/by-dataframe/pandas" + package-ecosystem: "pip" + schedule: + interval: "weekly" + - directory: "/by-language/csharp-npgsql" package-ecosystem: "nuget" schedule: diff --git a/.github/workflows/df-dask.yml b/.github/workflows/df-dask.yml new file mode 100644 index 00000000..da631caf --- /dev/null +++ b/.github/workflows/df-dask.yml @@ -0,0 +1,74 @@ +name: Dask + +on: + pull_request: + branches: ~ + paths: + - '.github/workflows/df-dask.yml' + - 'by-dataframe/dask/**' + - 'requirements.txt' + push: + branches: [ main ] + paths: + - '.github/workflows/df-dask.yml' + - 'by-dataframe/dask/**' + - 'requirements.txt' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + test: + name: " + Python: ${{ matrix.python-version }} + CrateDB: ${{ matrix.cratedb-version }} + on ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-latest' ] + python-version: [ '3.11', '3.12' ] + cratedb-version: [ 'nightly' ] + + services: + cratedb: + image: crate/crate:${{ matrix.cratedb-version }} + ports: + - 4200:4200 + - 5432:5432 + env: + CRATE_HEAP_SIZE: 4g + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: | + requirements.txt + by-dataframe/dask/requirements.txt + by-dataframe/dask/requirements-dev.txt + + - name: Install utilities + run: | + pip install -r requirements.txt + + - name: Validate by-dataframe/dask + run: | + ngr test --accept-no-venv by-dataframe/dask diff --git a/.github/workflows/df-pandas.yml b/.github/workflows/df-pandas.yml new file mode 100644 index 00000000..7c2e6813 --- /dev/null +++ b/.github/workflows/df-pandas.yml @@ -0,0 +1,74 @@ +name: pandas + +on: + pull_request: + branches: ~ + paths: + - '.github/workflows/df-pandas.yml' + - 'by-dataframe/pandas/**' + - 'requirements.txt' + push: + branches: [ main ] + paths: + - '.github/workflows/df-pandas.yml' + - 'by-dataframe/pandas/**' + - 'requirements.txt' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + test: + name: " + Python: ${{ matrix.python-version }} + CrateDB: ${{ matrix.cratedb-version }} + on ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-latest' ] + python-version: [ '3.11', '3.12' ] + cratedb-version: [ 'nightly' ] + + services: + cratedb: + image: crate/crate:${{ matrix.cratedb-version }} + ports: + - 4200:4200 + - 5432:5432 + env: + CRATE_HEAP_SIZE: 4g + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: | + requirements.txt + by-dataframe/pandas/requirements.txt + by-dataframe/pandas/requirements-dev.txt + + - name: Install utilities + run: | + pip install -r requirements.txt + + - name: Validate by-dataframe/pandas + run: | + ngr test --accept-no-venv by-dataframe/pandas diff --git a/.github/workflows/test-python-sqlalchemy.yml b/.github/workflows/sqlalchemy.yml similarity index 92% rename from .github/workflows/test-python-sqlalchemy.yml rename to .github/workflows/sqlalchemy.yml index e6a00a01..3c3b00c4 100644 --- a/.github/workflows/test-python-sqlalchemy.yml +++ b/.github/workflows/sqlalchemy.yml @@ -1,16 +1,16 @@ -name: Python SQLAlchemy +name: SQLAlchemy on: pull_request: branches: ~ paths: - - '.github/workflows/test-python-sqlalchemy.yml' + - '.github/workflows/sqlalchemy.yml' - 'by-language/python-sqlalchemy/**' - 'requirements.txt' push: branches: [ main ] paths: - - '.github/workflows/test-python-sqlalchemy.yml' + - '.github/workflows/sqlalchemy.yml' - 'by-language/python-sqlalchemy/**' - 'requirements.txt' @@ -46,6 +46,8 @@ jobs: ports: - 4200:4200 - 5432:5432 + env: + CRATE_HEAP_SIZE: 4g steps: diff --git a/.gitignore b/.gitignore index d75463fe..561298ff 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ .idea .venv* __pycache__ -.coverage +.coverage* .DS_Store coverage.xml mlruns/ diff --git a/by-dataframe/dask/README.md b/by-dataframe/dask/README.md new file mode 100644 index 00000000..f1c09046 --- /dev/null +++ b/by-dataframe/dask/README.md @@ -0,0 +1,101 @@ +# Connect to CrateDB and CrateDB Cloud using pandas + + +## About +Example programs demonstrating connectivity with [Dask] and [CrateDB]. + +This section and examples are mostly about [DataFrame operations with SQLAlchemy], +specifically about how to insert data into [CrateDB] efficiently. + + +## Usage + +The CrateDB Python driver provides a convenience function `insert_bulk`, +which allows you to efficiently insert multiple rows of data into a CrateDB +database table in a single operation. It can be used like this: + +```python +# CrateDB Cloud +# DBURI = "crate://admin:@.aks1.westeurope.azure.cratedb.net:4200?ssl=true" + +# CrateDB Self-Managed +# DBURI = "crate://crate@localhost:4200/" + +import sqlalchemy as sa +from crate.client.sqlalchemy.support import insert_bulk + +ddf.to_sql( + "testdrive", + uri=DBURI, + index=False, + if_exists="replace", + chunksize=10_000, + parallel=True, + method=insert_bulk, +) +``` + + +## Setup + +To start a CrateDB instance on your machine, invoke: +```shell +docker run -it --rm \ + --publish=4200:4200 --publish=5432:5432 \ + --env=CRATE_HEAP_SIZE=4g \ + crate:latest -Cdiscovery.type=single-node +``` + +Acquire `cratedb-example` repository, and set up sandbox: +```shell +git clone https://github.com/crate/cratedb-examples +cd cratedb-examples +python3 -m venv .venv +source .venv/bin/activate +``` + +Then, invoke the integration test cases: +```shell +ngr test by-dataframe/dask +``` + + +## Examples +The `insert` example programs are about efficient data loading: +```shell +time python insert_dask.py +time python insert_dask.py --mode=basic +time python insert_dask.py --mode=bulk --bulk-size=20000 --num-records=75000 +``` + + +## Connect to CrateDB Cloud + +By default, the example programs will connect to CrateDB on `localhost`. +In order to connect to any other database instance, for example to [CrateDB +Cloud]: + +```shell +export DBURI="crate://crate@localhost:4200/" +export DBURI="crate://admin:@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" +time python insert_dask.py --dburi="${DBURI}" +``` + +```{tip} +For more information, please refer to the header sections of each of the +provided example programs. +``` + + +## Tests + +To test the accompanied example programs all at once, invoke the software tests: +```shell +pytest +``` + + +[CrateDB]: https://github.com/crate/crate +[CrateDB Cloud]: https://console.cratedb.cloud/ +[Dask]: https://www.dask.org/ +[DataFrame operations with SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html diff --git a/by-dataframe/dask/insert_dask.py b/by-dataframe/dask/insert_dask.py new file mode 100644 index 00000000..46ac1c3a --- /dev/null +++ b/by-dataframe/dask/insert_dask.py @@ -0,0 +1,73 @@ +""" +About +===== + +Evaluate inserting data from Dask dataframes into CrateDB. + +Setup +===== +:: + + pip install --upgrade click colorlog 'crate[sqlalchemy]' pandas + +Synopsis +======== +:: + + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate:latest + python insert_dask.py +""" +import logging + +import click +import colorlog +import dask.dataframe as dd +import sqlalchemy as sa +from colorlog.escape_codes import escape_codes +from crate.client.sqlalchemy.support import insert_bulk +from dask.diagnostics import ProgressBar +from pueblo.testing.pandas import makeTimeDataFrame +from pueblo.util.logging import setup_logging + +logger = logging.getLogger(__name__) + +SQLALCHEMY_LOGGING = True +TABLE_NAME = "testdrive_dask" + + +def db_workload(dburi: str, mode: str, num_records: int, bulk_size: int): + pbar = ProgressBar() + pbar.register() + + # Create example Dask DataFrame for testing purposes. + df = makeTimeDataFrame(nper=num_records, freq="S") + ddf = dd.from_pandas(df, npartitions=4) + + # Save DataFrame into CrateDB efficiently. + + # Works. Takes ~3 seconds. + if mode == "basic": + ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True) + + # Works. Takes ~10 seconds. + elif mode == "multi": + ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method="multi") + + # Works. Takes ~2 seconds. + elif mode == "bulk": + ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method=insert_bulk) + + +@click.command() +@click.option("--dburi", type=str, default="crate://localhost:4200", required=False, help="SQLAlchemy database connection URI.") +@click.option("--mode", type=click.Choice(['basic', 'multi', 'bulk']), default="bulk", required=False, help="Insert mode.") +@click.option("--num-records", type=int, default=125_000, required=False, help="Number of records to insert.") +@click.option("--bulk-size", type=int, default=10_000, required=False, help="Bulk size / chunk size.") +@click.help_option() +def main(dburi: str, mode: str, num_records: int, bulk_size: int): + setup_logging() + db_workload(dburi=dburi, mode=mode, num_records=num_records, bulk_size=bulk_size) + + +if __name__ == "__main__": + main() diff --git a/by-dataframe/dask/pyproject.toml b/by-dataframe/dask/pyproject.toml new file mode 100644 index 00000000..694534e2 --- /dev/null +++ b/by-dataframe/dask/pyproject.toml @@ -0,0 +1,24 @@ +[tool.pytest.ini_options] +minversion = "2.0" +addopts = """ + -rfEX -p pytester --strict-markers --verbosity=3 + --cov --cov-report=term-missing + --capture=no + """ +log_level = "DEBUG" +log_cli_level = "DEBUG" +testpaths = ["*.py"] +xfail_strict = true +markers = [ +] + + +[tool.coverage.run] +branch = false +omit = [ + "test*", +] + +[tool.coverage.report] +fail_under = 0 +show_missing = true diff --git a/by-dataframe/dask/requirements-dev.txt b/by-dataframe/dask/requirements-dev.txt new file mode 100644 index 00000000..e03009bf --- /dev/null +++ b/by-dataframe/dask/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest<9 +pytest-cov<5 diff --git a/by-dataframe/dask/requirements.txt b/by-dataframe/dask/requirements.txt new file mode 100644 index 00000000..bffe935a --- /dev/null +++ b/by-dataframe/dask/requirements.txt @@ -0,0 +1,5 @@ +click<9 +colorlog<7 +crate[sqlalchemy] +dask[dataframe]<=2024.1.1 +pueblo>=0.0.7 diff --git a/by-dataframe/dask/test.py b/by-dataframe/dask/test.py new file mode 100644 index 00000000..9c3d92fa --- /dev/null +++ b/by-dataframe/dask/test.py @@ -0,0 +1,59 @@ +import shlex +import subprocess +import pytest +import sqlalchemy as sa + + +DBURI = "crate://localhost:4200" + + +def run(command: str): + subprocess.check_call(shlex.split(command)) + + +def test_insert_dask_basic(reset_table): + cmd = "time python insert_dask.py --mode=basic --num-records=5000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_dask_multi(reset_table): + cmd = "time python insert_dask.py --mode=multi --num-records=5000 --bulk-size=1000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_dask_bulk(reset_table): + cmd = "time python insert_dask.py --mode=bulk --num-records=5000 --bulk-size=1000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_dask_unknown(reset_table): + cmd = "time python insert_dask.py --mode=foobar" + with pytest.raises(subprocess.CalledProcessError) as ex: + run(cmd) + assert ex.match("Command.+returned non-zero exit status") + + +@pytest.fixture +def reset_table(): + """Drop database tables used for testing.""" + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql("DROP TABLE IF EXISTS testdrive_dask;") + + +def get_engine(): + """Provide an SQLAlchemy `engine` instance.""" + return sa.create_engine(DBURI) + + +def get_table_cardinality(): + """Get number of records in table used for testing.""" + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql("REFRESH TABLE testdrive_dask;") + result = conn.exec_driver_sql("SELECT COUNT(*) FROM testdrive_dask;") + table_size = result.scalar_one() + return table_size diff --git a/by-dataframe/pandas/README.md b/by-dataframe/pandas/README.md new file mode 100644 index 00000000..4af2cb3e --- /dev/null +++ b/by-dataframe/pandas/README.md @@ -0,0 +1,101 @@ +# Connect to CrateDB and CrateDB Cloud using pandas + + +## About +Example programs demonstrating connectivity with [pandas] and [CrateDB]. + +This section and examples are mostly about [DataFrame operations with SQLAlchemy], +specifically about how to insert data into [CrateDB] efficiently. + + +## Usage + +The CrateDB Python driver provides a convenience function `insert_bulk`, +which allows you to efficiently insert multiple rows of data into a CrateDB +database table in a single operation. It can be used like this: + +```python +# CrateDB Cloud +# DBURI = "crate://admin:@.aks1.westeurope.azure.cratedb.net:4200?ssl=true" + +# CrateDB Self-Managed +# DBURI = "crate://crate@localhost:4200/" + +import sqlalchemy as sa +from crate.client.sqlalchemy.support import insert_bulk + +engine = sa.create_engine(DBURI, **kwargs) +df.to_sql( + name="testdrive", + con=engine, + if_exists="append", + index=False, + chunksize=5_000, + method=insert_bulk, +) +``` + + +## Setup + +To start a CrateDB instance on your machine, invoke: +```shell +docker run -it --rm \ + --publish=4200:4200 --publish=5432:5432 \ + --env=CRATE_HEAP_SIZE=4g \ + crate:latest -Cdiscovery.type=single-node +``` + +Acquire `cratedb-example` repository, and set up sandbox: +```shell +git clone https://github.com/crate/cratedb-examples +cd cratedb-examples +python3 -m venv .venv +source .venv/bin/activate +``` + +Then, invoke the integration test cases: +```shell +ngr test by-dataframe/pandas +``` + + +## Examples +The `insert` example programs are about efficient data loading: +```shell +time python insert_pandas.py +time python insert_pandas.py --mode=basic --insertmanyvalues-page-size=5000 +time python insert_pandas.py --mode=bulk --bulk-size=20000 --num-records=75000 +``` + + +## Connect to CrateDB Cloud + +By default, the example programs will connect to CrateDB on `localhost`. +In order to connect to any other database instance, for example to [CrateDB +Cloud]: + +```shell +export DBURI="crate://crate@localhost:4200/" +export DBURI="crate://admin:@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" +time python insert_pandas.py --dburi="${DBURI}" +``` + +```{tip} +For more information, please refer to the header sections of each of the +provided example programs. +``` + + +## Tests + +To test the accompanied example programs all at once, invoke the software tests: +```shell +pytest +``` + + +[CrateDB]: https://github.com/crate/crate +[CrateDB Cloud]: https://console.cratedb.cloud/ +[DataFrame operations with SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html +[pandas]: https://pandas.pydata.org/ diff --git a/by-language/python-sqlalchemy/insert_pandas.py b/by-dataframe/pandas/insert_pandas.py similarity index 91% rename from by-language/python-sqlalchemy/insert_pandas.py rename to by-dataframe/pandas/insert_pandas.py index e86ca586..c268e083 100644 --- a/by-language/python-sqlalchemy/insert_pandas.py +++ b/by-dataframe/pandas/insert_pandas.py @@ -2,6 +2,8 @@ About ===== +Evaluate inserting data from pandas dataframes into CrateDB. + Example program to demonstrate efficient batched inserts using CrateDB and pandas, based on SQLAlchemy's `insertmanyvalues` vs. CrateDB's bulk import HTTP endpoint. @@ -14,7 +16,7 @@ ===== :: - pip install --upgrade click colorlog 'crate[sqlalchemy]' pandas sqlalchemy + pip install --upgrade click colorlog 'crate[sqlalchemy]' pandas Synopsis @@ -22,7 +24,7 @@ :: # Run CrateDB. - docker run --rm -it --publish=4200:4200 crate + docker run --rm -it --publish=4200:4200 crate:latest # Use local CrateDB. time python insert_pandas.py @@ -52,7 +54,7 @@ from colorlog.escape_codes import escape_codes from crate.client.sqlalchemy.support import insert_bulk from pueblo.testing.pandas import makeTimeDataFrame - +from pueblo.util.logging import setup_logging logger = logging.getLogger(__name__) @@ -62,7 +64,7 @@ class DatabaseWorkload: - table_name = "foo" + table_name = "testdrive_pandas" def __init__(self, dburi: str): self.dburi = dburi @@ -105,7 +107,7 @@ def process(self, mode: str, num_records: int, bulk_size: int, insertmanyvalues_ elif mode == "bulk": df.to_sql(name=self.table_name, con=engine, if_exists="append", index=False, chunksize=bulk_size, method=insert_bulk) - else: + else: # pragma: nocover raise ValueError(f"Unknown mode: {mode}") def show_table_stats(self): @@ -121,14 +123,7 @@ def show_table_stats(self): #engine.dispose() -def setup_logging(level=logging.INFO): - reset = escape_codes["reset"] - log_format = f"%(asctime)-15s [%(name)-26s] %(log_color)s%(levelname)-8s:{reset} %(message)s" - - handler = colorlog.StreamHandler() - handler.setFormatter(colorlog.ColoredFormatter(log_format)) - - logging.basicConfig(format=log_format, level=level, handlers=[handler]) +def tweak_log_levels(level=logging.INFO): # Enable SQLAlchemy logging. if SQLALCHEMY_LOGGING: @@ -144,6 +139,7 @@ def setup_logging(level=logging.INFO): @click.help_option() def main(dburi: str, mode: str, num_records: int, bulk_size: int, insertmanyvalues_page_size: int): setup_logging() + tweak_log_levels() dbw = DatabaseWorkload(dburi=dburi) dbw.process(mode, num_records, bulk_size, insertmanyvalues_page_size) dbw.show_table_stats() diff --git a/by-dataframe/pandas/pyproject.toml b/by-dataframe/pandas/pyproject.toml new file mode 100644 index 00000000..694534e2 --- /dev/null +++ b/by-dataframe/pandas/pyproject.toml @@ -0,0 +1,24 @@ +[tool.pytest.ini_options] +minversion = "2.0" +addopts = """ + -rfEX -p pytester --strict-markers --verbosity=3 + --cov --cov-report=term-missing + --capture=no + """ +log_level = "DEBUG" +log_cli_level = "DEBUG" +testpaths = ["*.py"] +xfail_strict = true +markers = [ +] + + +[tool.coverage.run] +branch = false +omit = [ + "test*", +] + +[tool.coverage.report] +fail_under = 0 +show_missing = true diff --git a/by-dataframe/pandas/requirements-dev.txt b/by-dataframe/pandas/requirements-dev.txt new file mode 100644 index 00000000..e03009bf --- /dev/null +++ b/by-dataframe/pandas/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest<9 +pytest-cov<5 diff --git a/by-dataframe/pandas/requirements.txt b/by-dataframe/pandas/requirements.txt new file mode 100644 index 00000000..ce7ae709 --- /dev/null +++ b/by-dataframe/pandas/requirements.txt @@ -0,0 +1,5 @@ +click<9 +colorlog<7 +crate[sqlalchemy] +pandas<2.3 +pueblo>=0.0.7 diff --git a/by-dataframe/pandas/test.py b/by-dataframe/pandas/test.py new file mode 100644 index 00000000..dba3b42a --- /dev/null +++ b/by-dataframe/pandas/test.py @@ -0,0 +1,58 @@ +import shlex +import subprocess +import pytest +import sqlalchemy as sa + +DBURI = "crate://localhost:4200" + + +def run(command: str): + subprocess.check_call(shlex.split(command)) + + +def test_insert_pandas_basic(reset_table): + cmd = "time python insert_pandas.py --mode=basic --num-records=5000 --insertmanyvalues-page-size=1000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_pandas_multi(reset_table): + cmd = "time python insert_pandas.py --mode=multi --num-records=5000 --bulk-size=1000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_pandas_bulk(reset_table): + cmd = "time python insert_pandas.py --mode=bulk --num-records=5000 --bulk-size=1000" + run(cmd) + assert get_table_cardinality() == 5000 + + +def test_insert_pandas_unknown(reset_table): + cmd = "time python insert_pandas.py --mode=foobar" + with pytest.raises(subprocess.CalledProcessError) as ex: + run(cmd) + assert ex.match("Command.+returned non-zero exit status") + + +@pytest.fixture +def reset_table(): + """Drop database tables used for testing.""" + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql("DROP TABLE IF EXISTS testdrive_pandas;") + + +def get_engine(): + """Provide an SQLAlchemy `engine` instance.""" + return sa.create_engine(DBURI) + + +def get_table_cardinality(): + """Get number of records in table used for testing.""" + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql("REFRESH TABLE testdrive_pandas;") + result = conn.exec_driver_sql("SELECT COUNT(*) FROM testdrive_pandas;") + table_size = result.scalar_one() + return table_size diff --git a/by-language/python-sqlalchemy/README.rst b/by-language/python-sqlalchemy/README.rst index 9a31f2f4..251e6844 100644 --- a/by-language/python-sqlalchemy/README.rst +++ b/by-language/python-sqlalchemy/README.rst @@ -11,51 +11,6 @@ About Example programs demonstrating CrateDB's SQLAlchemy adapter and dialect. -This section and examples are mostly about `DataFrame operations with SQLAlchemy`_, -specifically about how to insert data into `CrateDB`_ efficiently using `pandas`_ and -`Dask`_. - - - -***** -Usage -***** - -The CrateDB Python driver provides a convenience function ``insert_bulk``. It -can be used like this:: - - # CrateDB Cloud - # DBURI = "crate://admin:@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" - - # CrateDB Self-Managed - # DBURI = "crate://crate@localhost:4200/" - - import sqlalchemy as sa - from crate.client.sqlalchemy.support import insert_bulk - - # pandas - engine = sa.create_engine(DBURI, **kwargs) - df.to_sql( - name="testdrive", - con=engine, - if_exists="append", - index=False, - chunksize=5_000, - method=insert_bulk, - ) - - # Dask - ddf.to_sql( - "testdrive", - uri=DBURI, - index=False, - if_exists="replace", - chunksize=10_000, - parallel=True, - method=insert_bulk, - ) - - ***** Setup @@ -63,16 +18,19 @@ Setup To start a CrateDB instance on your machine for evaluation purposes, invoke:: - docker run -it --rm --publish=4200:4200 --publish=5432:5432 crate + docker run -it --rm --publish=4200:4200 --publish=5432:5432 crate:latest Navigate to example program directory, and install prerequisites:: # Acquire sources. git clone https://github.com/crate/cratedb-examples - cd cratedb-examples/by-language/python-sqlalchemy + cd cratedb-examples python3 -m venv .venv source .venv/bin/activate - pip install --upgrade --requirement requirements.txt + +Then, invoke the integration test cases:: + + ngr test by-language/python-sqlalchemy ******** @@ -84,12 +42,6 @@ The ``insert`` example programs are about efficient data loading:: time python insert_efficient.py cratedb multirow time python insert_efficient.py cratedb batched - time python insert_pandas.py - time python insert_pandas.py --mode=basic --insertmanyvalues-page-size=5000 - time python insert_pandas.py --mode=bulk --bulk-size=20000 --num-records=75000 - - time python insert_dask.py - The ``sync`` and ``async`` example programs demonstrate SQLAlchemy's low-level table/core API using both the HTTP-based transport driver using ``urllib3``, as well as the canonical ``asyncpg`` and ``psycopg3`` @@ -99,16 +51,6 @@ drivers using the PostgreSQL wire protocol:: time python async_table.py asyncpg psycopg time python async_streaming.py asyncpg psycopg -Connect to CrateDB Cloud -======================== - -By default, the example programs will connect to CrateDB on ``localhost``. -In order to connect to any other database instance, for example on CrateDB -Cloud:: - - export DBURI="crate://crate@localhost:4200/" - export DBURI="crate://admin:@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" - time python insert_pandas.py --dburi="${DBURI}" .. TIP:: @@ -126,6 +68,3 @@ To test the accompanied example programs all at once, invoke the software tests: .. _CrateDB: https://github.com/crate/crate -.. _Dask: https://www.dask.org/ -.. _DataFrame operations with SQLAlchemy: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html -.. _pandas: https://pandas.pydata.org/ diff --git a/by-language/python-sqlalchemy/insert_dask.py b/by-language/python-sqlalchemy/insert_dask.py deleted file mode 100644 index 65a60aa1..00000000 --- a/by-language/python-sqlalchemy/insert_dask.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -About -===== - -Evaluate saving Dask DataFrames into CrateDB. - -Usage -===== -:: - - docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate - pip install --upgrade 'crate[sqlalchemy]' dask pandas - python insert_dask.py - -""" -import dask.dataframe as dd -from dask.diagnostics import ProgressBar -from crate.client.sqlalchemy.support import insert_bulk -from pueblo.testing.pandas import makeTimeDataFrame - - -DBURI = "crate://localhost:4200" - - -def main(): - pbar = ProgressBar() - pbar.register() - - # Create example Dask DataFrame for testing purposes. - df = makeTimeDataFrame(nper=125_000, freq="S") - ddf = dd.from_pandas(df, npartitions=4) - - # Save DataFrame into CrateDB efficiently. - - # Works. Takes ~3 seconds. - # ddf.to_sql("testdrive", uri=DBURI, index=False, if_exists="replace", chunksize=10_000, parallel=True) - - # Works. Takes ~10 seconds. - # ddf.to_sql("testdrive", uri=DBURI, index=False, if_exists="replace", chunksize=10_000, parallel=True, method="multi") - - # Works. Takes ~2 seconds. - ddf.to_sql("testdrive", uri=DBURI, index=False, if_exists="replace", chunksize=10_000, parallel=True, method=insert_bulk) - - -if __name__ == "__main__": - main() diff --git a/by-language/python-sqlalchemy/requirements-dev.txt b/by-language/python-sqlalchemy/requirements-dev.txt index c3833f9a..e03009bf 100644 --- a/by-language/python-sqlalchemy/requirements-dev.txt +++ b/by-language/python-sqlalchemy/requirements-dev.txt @@ -1,3 +1,2 @@ -pueblo>=0.0.7 pytest<9 pytest-cov<5 diff --git a/by-language/python-sqlalchemy/requirements.txt b/by-language/python-sqlalchemy/requirements.txt index 5b971e61..1e53d67b 100644 --- a/by-language/python-sqlalchemy/requirements.txt +++ b/by-language/python-sqlalchemy/requirements.txt @@ -1,6 +1,5 @@ click<9 colorlog<7 -dask==2024.1.1 -pandas<2.3 +pueblo>=0.0.7 sqlalchemy>=2,<2.1 sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async diff --git a/by-language/python-sqlalchemy/test.py b/by-language/python-sqlalchemy/test.py index fd168a29..2975f891 100644 --- a/by-language/python-sqlalchemy/test.py +++ b/by-language/python-sqlalchemy/test.py @@ -25,16 +25,6 @@ def test_insert_efficient_unknown(capfd): assert "ValueError: Unknown variant: unknown" in err -def test_insert_pandas(): - cmd = "time python insert_pandas.py" - run(cmd) - - -def test_insert_dask(): - cmd = "time python insert_dask.py" - run(cmd) - - def test_sync_table(): cmd = "time python sync_table.py urllib3 psycopg" run(cmd)