Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctk load table: Add support for MongoDB Change Streams #196

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- `ctk load table`: Added support for MongoDB Change Streams

## 2024/07/08 v0.0.15
- IO: Added the `if-exists` query parameter by updating to influxio 0.4.0.
Expand Down
16 changes: 11 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
mongodb_relay_cdc(source_url, target_url, progress=True)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
else:
raise NotImplementedError("Importing resource not implemented yet")
39 changes: 39 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
Expand Down Expand Up @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc
ctk load table mongodb+cdc://localhost:27017/testdrive/demo

Backlog
-------
TODO: Run on multiple collections.
TODO: Run on the whole database.
TODO: Accept parameters like `if_exists="append,replace"`.
TODO: Propagate parameters like `scan="full"`.
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
)

# Invoke machinery.
relay.start()
61 changes: 61 additions & 0 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Basic relaying of a MongoDB Change Stream into CrateDB table.

Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

import logging

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB

from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class MongoDBCDCRelayCrateDB:
"""
Relay MongoDB Change Stream into CrateDB table.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))

def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop.
# FIXME: Note that the function does not perform any sensible error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
yield self.cdc.to_sql(change)
156 changes: 156 additions & 0 deletions doc/io/mongodb/cdc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
(mongodb-cdc-relay)=
# MongoDB CDC Relay

## About
Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command
`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas.

You can use it in order to facilitate convenient data transfers to be used
within data pipelines or ad hoc operations. It can be used as a CLI interface,
and as a library.


## Install
```shell
pip install --upgrade 'cratedb-toolkit[mongodb]'
```

:::{tip}
The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash`
will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be
installed by other means. If you are using Docker anyway, please use those
command aliases to provide them to your environment without actually needing
to install them.

```shell
alias mongosh='docker run -i --rm --network=host mongo:7 mongosh'
```

The `atlas` program needs to store authentication information between invocations,
therefore you need to supply a storage volume.
```shell
mkdir atlas-config
alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas'
```
:::


## Usage

(mongodb-cdc-workstation)=
### Workstation
The guidelines assume that both services, CrateDB and MongoDB, are listening on
`localhost`.
Please find guidelines how to provide them on your workstation using
Docker or Podman in the {ref}`mongodb-services-standalone` section.
```shell
export MONGODB_URL=mongodb://localhost/testdrive
export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc
ctk load table "${MONGODB_URL_CTK}"
```

Insert document into MongoDB collection, and update it.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

Query data in CrateDB.
```shell
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

Invoke a delete operation, and check data in CrateDB once more.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})'
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

(mongodb-cdc-cloud)=
### Cloud
The guidelines assume usage of cloud variants for both services, CrateDB Cloud
and MongoDB Atlas.
Please find guidelines how to provision relevant cloud resources in the
{ref}`mongodb-services-cloud` section. You will need authentication credentials
from this step for the next one.

:::{rubric} Invoke pipeline
:::
A canonical invocation for ingesting MongoDB Atlas Change Streams into
CrateDB Cloud.

```shell
export MONGODB_URL=mongodb+srv://user:password@testdrive.jaxmmfp.mongodb.net/testdrive
export MONGODB_URL_CTK=mongodb+srv+cdc://user:password@testdrive.jaxmmfp.mongodb.net/testdrive/demo
export CRATEDB_HTTP_URL="https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/"
export CRATEDB_SQLALCHEMY_URL="crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/testdrive/demo-cdc?ssl=true"
```
```shell
ctk load table "${MONGODB_URL_CTK}"
```

:::{note}
Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the
`ssl=true` query parameter. Both are needed to establish connectivity with
MongoDB Atlas and CrateDB.
:::

:::{rubric} Trigger CDC events
:::
Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})'
mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })'
```

:::{rubric} Query data in CrateDB
:::
```shell
crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";'
```


## Appendix
A few operations that are handy when exploring this exercise.

### Database Services
Provide CrateDB and MongoDB services.
- See {ref}`mongodb-services`.

### Database Operations

Query records in CrateDB table.
```shell
crash --command 'SELECT * FROM "testdrive"."demo-cdc";'
```

Truncate CrateDB table.
```shell
crash --command 'DELETE FROM "testdrive"."demo-cdc";'
```

Query documents in MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.find()'
```

Truncate MongoDB collection.
```shell
mongosh "${MONGODB_URL}" --eval 'db.demo.drop()'
```


## Backlog
:::{todo}
- Improve general CLI UX/DX, for example by using `ctk shell`.
- Provide [SDK and CLI for CrateDB Cloud Cluster APIs], for improving Cloud DX.
:::


[commons-codec]: https://pypi.org/project/commons-codec/
[CrateDB]: https://cratedb.com/docs/guide/home/
[CrateDB Cloud]: https://cratedb.com/docs/cloud/
[MongoDB Atlas]: https://www.mongodb.com/atlas
[MongoDB Change Stream]: https://www.mongodb.com/docs/manual/changeStreams/
[SDK and CLI for CrateDB Cloud Cluster APIs]: https://github.com/crate-workbench/cratedb-toolkit/pull/81
11 changes: 10 additions & 1 deletion doc/io/mongodb/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,14 @@ Using the MongoDB subsystem, you can transfer data from and to MongoDB.
:maxdepth: 1

loader
migr8
cdc
```

:::{note}
The MongoDB Table Loader is an improvement of the traditional {doc}`migr8`.
:::

:::{tip}
In order to learn how to provide CrateDB and MongoDB services optimally, to
evaluate this subsystem, see {ref}`mongodb-services`.
:::
6 changes: 5 additions & 1 deletion doc/io/mongodb/migr8.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
orphan: true
---

(migr8)=
# migr8
# migr8 migration utility

## About

Expand Down
Loading