Skip to content

Commit

Permalink
MongoDB: Improve UX by using ctk load table mongodb://...
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Nov 13, 2023
1 parent 3c0513c commit 0696197
Show file tree
Hide file tree
Showing 29 changed files with 512 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/mongodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[io,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[mongodb,test,develop]
- name: Run linter and software tests
run: |
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
__pycache__
*.pyc
dist
.coverage
.coverage*
coverage.xml
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
- Add SQL runner utility primitives to `io.sql` namespace
- Add `import_csv_pandas` and `import_csv_dask` utility primitives
- InfluxDB: Add adapter for `influxio`
- Add `migr8` program from previous repository
- MongoDB: Add `migr8` program from previous repository
- MongoDB: Improve UX by using `ctk load table mongodb://...`


## 2023/11/06 v0.0.2
Expand Down
40 changes: 40 additions & 0 deletions cratedb_toolkit/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,45 @@ ctk show table "testdrive.demo"
```


## MongoDB

Using the MongoDB subsystem, you can transfer data from MongoDB to CrateDB.

Import two data points into MongoDB.
```shell
mongosh mongodb://localhost:27017/testdrive <<EOF
db.demo.remove({})
db.demo.insertMany([
{
timestamp: new Date(1556896326),
region: "amazonas",
temperature: 42.42,
humidity: 84.84,
},
{
timestamp: new Date(1556896327),
region: "amazonas",
temperature: 45.89,
humidity: 77.23,
windspeed: 5.4,
},
])
db.demo.find({})
EOF
```

Todo: Use `mongoimport`.
```shell
mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority'
```

Transfer data.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
crash --command "SELECT * FROM testdrive.demo;"
```


[CrateDB Cloud]: https://console.cratedb.cloud/
[influxio]: https://github.com/daq-tools/influxio
42 changes: 34 additions & 8 deletions cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cratedb_toolkit.cluster.util import get_cluster_info
from cratedb_toolkit.io.croud import CloudIo
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cli import boot_click, make_command
from cratedb_toolkit.util.croud import CroudException

Expand Down Expand Up @@ -70,21 +71,35 @@ def error(self):
@click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)
@click.option("--cratedb-http-url", envvar="CRATEDB_HTTP_URL", type=str, required=False, help="CrateDB HTTP URL")
@click.option("--table", type=str, required=False, help="Table name where to import the io")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.pass_context
def load_table(ctx: click.Context, url: str, cluster_id: str, cratedb_sqlalchemy_url: str, table: str, format_: str):
def load_table(
ctx: click.Context,
url: str,
cluster_id: str,
cratedb_sqlalchemy_url: str,
cratedb_http_url: str,
table: str,
format_: str,
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
"""

if not cluster_id and not cratedb_sqlalchemy_url:
if not cluster_id and not cratedb_sqlalchemy_url and not cratedb_http_url:
raise KeyError(
"Either CrateDB Cloud Cluster identifier or CrateDB SQLAlchemy URL needs to be supplied. "
"Use --cluster-id / --cratedb-sqlalchemy-url CLI options "
"or CRATEDB_CLOUD_CLUSTER_ID / CRATEDB_SQLALCHEMY_URL environment variables."
"Either CrateDB Cloud Cluster identifier or CrateDB SQLAlchemy or HTTP URL needs to be supplied. "
"Use --cluster-id / --cratedb-sqlalchemy-url / --cratedb-http-url CLI options "
"or CRATEDB_CLOUD_CLUSTER_ID / CRATEDB_SQLALCHEMY_URL / CRATEDB_HTTP_URL environment variables."
)

# When SQLAlchemy URL is not given, but HTTP URL is, compute the former on demand.
if not cratedb_sqlalchemy_url and cratedb_http_url:
cratedb_sqlalchemy_url = DatabaseAddress.from_httpuri(cratedb_http_url).dburi

# Dispatch "load table" operation.
if cluster_id:
load_table_cloud(cluster_id, url)
elif cratedb_sqlalchemy_url:
Expand Down Expand Up @@ -125,14 +140,25 @@ def load_table_cloud(cluster_id: str, resource_url: str):

def load_table_cratedb(sqlalchemy_url: str, resource_url: str):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = resource_url
target_url = sqlalchemy_url
if resource_url.startswith("influxdb"):
from cratedb_toolkit.io.influxdb import influxdb_copy

source_url = resource_url.replace("influxdb2", "http")
target_url = sqlalchemy_url
influxdb_copy(source_url, target_url, progress=True)
source_url = source_url.replace("influxdb2://", "http://")
if not influxdb_copy(source_url, target_url, progress=True):
sys.exit(1)
elif resource_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
sys.exit(1)
else:
raise NotImplementedError("Importing resource not implemented yet")
12 changes: 12 additions & 0 deletions cratedb_toolkit/io/influxdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import logging

from influxio.core import copy

logger = logging.getLogger(__name__)


def influxdb_copy(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
"""
logger.info("Running InfluxDB copy")
copy(source_url, target_url, progress=progress)
return True
27 changes: 24 additions & 3 deletions cratedb_toolkit/io/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
A utility program, called `migr8`, supporting data migrations
between MongoDB and CrateDB.

A one-stop command `ctk load table mongodb://...`, wrapping the `migr8`
steps into a complete pipeline, to facilitate convenient data transfers.


## About

Expand Down Expand Up @@ -32,22 +35,40 @@ client driver library to version 3, like `pip install 'pymongo<4'`.

Use `pip` to install the package from PyPI.
```shell
pip install --upgrade 'cratedb-toolkit[io]'
pip install --upgrade 'cratedb-toolkit[mongodb]'
```

To verify if the installation worked, invoke:
```shell
migr8 --version
migr8 --help
ctk --version
```


## Usage

`ctk load table` is your one-stop command to populate a CrateDB table from a
MongoDB collection.

```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
```

It will run `extract` and `translate` to gather the SQL DDL schema, and will
invoke `export` and `cr8` to actually transfer data.


## Usage for `migr8`

The program `migr8` offers three subcommands (`extract`, `translate`, `export`),
to conclude data transfers from MongoDB to CrateDB. Please read this section
carefully to learn how they can be used successfully.

```shell
migr8 --version
migr8 --help
```

### Schema Extraction

To extract a description of the schema of a collection, use the
Expand Down
70 changes: 70 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import argparse
import logging

from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
from cratedb_toolkit.util.database import DatabaseAdapter

logger = logging.getLogger(__name__)


def mongodb_copy(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
Backlog
-------
TODO: Run on multiple collections.
TODO: Run on the whole database.
TODO: Accept parameters like `if_exists="append,replace"`.
TODO: Propagate parameters like `scan="full"`.
TODO: Handle timestamp precision(s)?
"""
logger.info("Running MongoDB copy")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

# 1. Extract schema from MongoDB collection.
logger.info(f"Extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
extract_args = argparse.Namespace(
url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection, scan="full"
)
mongodb_schema = extract(extract_args)
count = mongodb_schema[mongodb_collection]["count"]
if not count > 0:
logger.error(f"No results when extracting schema from MongoDB: {mongodb_database}.{mongodb_collection}")
return False

# 2. Translate schema to SQL DDL.
cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()
ddl = translate(mongodb_schema, schemaname=cratedb_table_address.schema)

# 3. Load schema SQL DDL into CrateDB.
cratedb = DatabaseAdapter(dburi=str(cratedb_uri))
for collection, query in ddl.items():
logger.info(f"Creating table for collection '{collection}': {query}")
cratedb.run_sql(query)

# 4. Transfer data to CrateDB.
"""
migr8 export --host localhost --port 27017 --database test_db --collection test | \
cr8 insert-json --hosts localhost:4200 --table test
"""
logger.info(
f"Transferring data from MongoDB to CrateDB: "
f"source={mongodb_collection_address.fullname}, target={cratedb_table_address.fullname}"
)
export_args = argparse.Namespace(url=str(mongodb_uri), database=mongodb_database, collection=mongodb_collection)
buffer = export(export_args)
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True
13 changes: 5 additions & 8 deletions cratedb_toolkit/io/mongodb/cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import argparse
import json

import pymongo
import rich
from bson.raw_bson import RawBSONDocument

from cratedb_toolkit import __version__
from cratedb_toolkit.io.mongodb.core import extract, translate

from .export import export
from cratedb_toolkit.io.mongodb.core import export, extract, translate


def extract_parser(subargs):
parser = subargs.add_parser("extract", help="Extract a schema from a MongoDB database")
parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL")
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
parser.add_argument("--database", required=True, help="MongoDB database")
Expand All @@ -35,6 +32,7 @@ def translate_parser(subargs):

def export_parser(subargs):
parser = subargs.add_parser("export", help="Export a MongoDB collection as plain JSON")
parser.add_argument("--url", default="mongodb://localhost:27017", help="MongoDB URL")
parser.add_argument("--collection", required=True)
parser.add_argument("--host", default="localhost", help="MongoDB host")
parser.add_argument("--port", default=27017, help="MongoDB port")
Expand Down Expand Up @@ -80,12 +78,11 @@ def translate_from_file(args):


def export_to_stdout(args):
client = pymongo.MongoClient(args.host, int(args.port), document_class=RawBSONDocument)
db = client[args.database]
export(db[args.collection])
export(args)


def main():
rich.print("\n[green bold]MongoDB[/green bold] -> [blue bold]CrateDB[/blue bold] Exporter :: Schema Extractor\n\n")
args = get_args()
if args.command == "extract":
extract_to_file(args)
Expand Down
Loading

0 comments on commit 0696197

Please sign in to comment.