Skip to content

Commit

Permalink
Dataframes: Refactor examples about pandas and Dask
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Feb 6, 2024
1 parent fc1fd00 commit 1357c23
Show file tree
Hide file tree
Showing 22 changed files with 632 additions and 145 deletions.
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ updates:

# Languages.

- directory: "/by-dataframe/dask"
package-ecosystem: "pip"
schedule:
interval: "weekly"

- directory: "/by-dataframe/pandas"
package-ecosystem: "pip"
schedule:
interval: "weekly"

- directory: "/by-language/csharp-npgsql"
package-ecosystem: "nuget"
schedule:
Expand Down
74 changes: 74 additions & 0 deletions .github/workflows/df-dask.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Dask

on:
pull_request:
branches: ~
paths:
- '.github/workflows/df-dask.yml'
- 'by-dataframe/dask/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/df-dask.yml'
- 'by-dataframe/dask/**'
- 'requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.11', '3.12' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/dask/requirements.txt
by-dataframe/dask/requirements-dev.txt
- name: Install utilities
run: |
pip install -r requirements.txt
- name: Validate by-dataframe/dask
run: |
ngr test --accept-no-venv by-dataframe/dask
74 changes: 74 additions & 0 deletions .github/workflows/df-pandas.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: pandas

on:
pull_request:
branches: ~
paths:
- '.github/workflows/df-pandas.yml'
- 'by-dataframe/pandas/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/df-pandas.yml'
- 'by-dataframe/pandas/**'
- 'requirements.txt'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'

# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:
test:
name: "
Python: ${{ matrix.python-version }}
CrateDB: ${{ matrix.cratedb-version }}
on ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
python-version: [ '3.11', '3.12' ]
cratedb-version: [ 'nightly' ]

services:
cratedb:
image: crate/crate:${{ matrix.cratedb-version }}
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: |
requirements.txt
by-dataframe/pandas/requirements.txt
by-dataframe/pandas/requirements-dev.txt
- name: Install utilities
run: |
pip install -r requirements.txt
- name: Validate by-dataframe/pandas
run: |
ngr test --accept-no-venv by-dataframe/pandas
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
name: Python SQLAlchemy
name: SQLAlchemy

on:
pull_request:
branches: ~
paths:
- '.github/workflows/test-python-sqlalchemy.yml'
- '.github/workflows/sqlalchemy.yml'
- 'by-language/python-sqlalchemy/**'
- 'requirements.txt'
push:
branches: [ main ]
paths:
- '.github/workflows/test-python-sqlalchemy.yml'
- '.github/workflows/sqlalchemy.yml'
- 'by-language/python-sqlalchemy/**'
- 'requirements.txt'

Expand Down Expand Up @@ -46,6 +46,8 @@ jobs:
ports:
- 4200:4200
- 5432:5432
env:
CRATE_HEAP_SIZE: 4g

steps:

Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.idea
.venv*
__pycache__
.coverage
.coverage*
.DS_Store
coverage.xml
mlruns/
Expand Down
101 changes: 101 additions & 0 deletions by-dataframe/dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Connect to CrateDB and CrateDB Cloud using pandas


## About
Example programs demonstrating connectivity with [Dask] and [CrateDB].

This section and examples are mostly about [DataFrame operations with SQLAlchemy],
specifically about how to insert data into [CrateDB] efficiently.


## Usage

The CrateDB Python driver provides a convenience function `insert_bulk`,
which allows you to efficiently insert multiple rows of data into a CrateDB
database table in a single operation. It can be used like this:

```python
# CrateDB Cloud
# DBURI = "crate://admin:<PASSWORD>@<CLUSTERNAME>.aks1.westeurope.azure.cratedb.net:4200?ssl=true"

# CrateDB Self-Managed
# DBURI = "crate://crate@localhost:4200/"

import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk

ddf.to_sql(
"testdrive",
uri=DBURI,
index=False,
if_exists="replace",
chunksize=10_000,
parallel=True,
method=insert_bulk,
)
```


## Setup

To start a CrateDB instance on your machine, invoke:
```shell
docker run -it --rm \
--publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=4g \
crate:latest -Cdiscovery.type=single-node
```

Acquire `cratedb-example` repository, and set up sandbox:
```shell
git clone https://github.com/crate/cratedb-examples
cd cratedb-examples
python3 -m venv .venv
source .venv/bin/activate
```

Then, invoke the integration test cases:
```shell
ngr test by-dataframe/dask
```


## Examples
The `insert` example programs are about efficient data loading:
```shell
time python insert_dask.py
time python insert_dask.py --mode=basic
time python insert_dask.py --mode=bulk --bulk-size=20000 --num-records=75000
```


## Connect to CrateDB Cloud

By default, the example programs will connect to CrateDB on `localhost`.
In order to connect to any other database instance, for example to [CrateDB
Cloud]:

```shell
export DBURI="crate://crate@localhost:4200/"
export DBURI="crate://admin:<PASSWORD>@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true"
time python insert_dask.py --dburi="${DBURI}"
```

```{tip}
For more information, please refer to the header sections of each of the
provided example programs.
```


## Tests

To test the accompanied example programs all at once, invoke the software tests:
```shell
pytest
```


[CrateDB]: https://github.com/crate/crate
[CrateDB Cloud]: https://console.cratedb.cloud/
[Dask]: https://www.dask.org/
[DataFrame operations with SQLAlchemy]: https://cratedb.com/docs/python/en/latest/by-example/sqlalchemy/dataframe.html
71 changes: 71 additions & 0 deletions by-dataframe/dask/insert_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
About
=====
Evaluate inserting data from Dask dataframes into CrateDB.
Setup
=====
::
pip install --upgrade click colorlog 'crate[sqlalchemy]' pandas
Synopsis
========
::
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate:latest
python insert_dask.py
"""
import logging

import click
import dask.dataframe as dd
import sqlalchemy as sa
from crate.client.sqlalchemy.support import insert_bulk
from dask.diagnostics import ProgressBar
from pueblo.testing.pandas import makeTimeDataFrame
from pueblo.util.logging import setup_logging

logger = logging.getLogger(__name__)

SQLALCHEMY_LOGGING = True
TABLE_NAME = "testdrive_dask"


def db_workload(dburi: str, mode: str, num_records: int, bulk_size: int):
pbar = ProgressBar()
pbar.register()

# Create example Dask DataFrame for testing purposes.
df = makeTimeDataFrame(nper=num_records, freq="S")
ddf = dd.from_pandas(df, npartitions=4)

# Save DataFrame into CrateDB efficiently.

# Works. Takes ~3 seconds.
if mode == "basic":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True)

# Works. Takes ~10 seconds.
elif mode == "multi":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method="multi")

# Works. Takes ~2 seconds.
elif mode == "bulk":
ddf.to_sql(TABLE_NAME, uri=dburi, index=False, if_exists="replace", chunksize=bulk_size, parallel=True, method=insert_bulk)


@click.command()
@click.option("--dburi", type=str, default="crate://localhost:4200", required=False, help="SQLAlchemy database connection URI.")
@click.option("--mode", type=click.Choice(['basic', 'multi', 'bulk']), default="bulk", required=False, help="Insert mode.")
@click.option("--num-records", type=int, default=125_000, required=False, help="Number of records to insert.")
@click.option("--bulk-size", type=int, default=10_000, required=False, help="Bulk size / chunk size.")
@click.help_option()
def main(dburi: str, mode: str, num_records: int, bulk_size: int):
setup_logging()
db_workload(dburi=dburi, mode=mode, num_records=num_records, bulk_size=bulk_size)


if __name__ == "__main__":
main()
Loading

0 comments on commit 1357c23

Please sign in to comment.