From 5099dd60231d73d67c6ef8d3eab403acf969f7e2 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 17 Jul 2024 16:46:21 +0200 Subject: [PATCH] MongoDB CDC: Add support for MongoDB Change Streams to `ctk load table` --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 16 +- cratedb_toolkit/io/mongodb/api.py | 39 +++++ cratedb_toolkit/io/mongodb/cdc.py | 62 +++++++ cratedb_toolkit/model.py | 9 +- doc/io/mongodb/cdc.md | 268 ++++++++++++++++++++++++++++++ doc/io/mongodb/index.md | 6 +- doc/io/mongodb/migr8.md | 6 +- pyproject.toml | 1 + 9 files changed, 398 insertions(+), 10 deletions(-) create mode 100644 cratedb_toolkit/io/mongodb/cdc.py create mode 100644 doc/io/mongodb/cdc.md diff --git a/CHANGES.md b/CHANGES.md index f8a0d44f..e4681e7f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- `ctk load table`: Added support for MongoDB Change Streams ## 2024/07/08 v0.0.15 - IO: Added the `if-exists` query parameter by updating to influxio 0.4.0. diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 84a84243..5ea2e5fe 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress): logger.error(msg) raise OperationFailed(msg) elif source_url.startswith("mongodb"): - from cratedb_toolkit.io.mongodb.api import mongodb_copy + if "+cdc" in source_url: + source_url = source_url.replace("+cdc", "") + from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc - if not mongodb_copy(source_url, target_url, progress=True): - msg = "Data loading failed" - logger.error(msg) - raise OperationFailed(msg) + mongodb_relay_cdc(source_url, target_url, progress=True) + else: + from cratedb_toolkit.io.mongodb.api import mongodb_copy + + if not mongodb_copy(source_url, target_url, progress=True): + msg = "Data loading failed" + logger.error(msg) + raise OperationFailed(msg) else: raise NotImplementedError("Importing resource not implemented yet") diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 46e163f0..a8f73bfb 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -1,6 +1,7 @@ import argparse import logging +from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB from cratedb_toolkit.io.mongodb.core import export, extract, translate from cratedb_toolkit.model import DatabaseAddress from cratedb_toolkit.util.cr8 import cr8_insert_json @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False): cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname) return True + + +def mongodb_relay_cdc(source_url, target_url, progress: bool = False): + """ + Synopsis + -------- + export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc + ctk load table mongodb+cdc://localhost:27017/testdrive/demo + + Backlog + ------- + TODO: Run on multiple collections. + TODO: Run on the whole database. + TODO: Accept parameters like `if_exists="append,replace"`. + TODO: Propagate parameters like `scan="full"`. + """ + logger.info("Running MongoDB CDC relay") + + # Decode database URL. + mongodb_address = DatabaseAddress.from_string(source_url) + mongodb_uri, mongodb_collection_address = mongodb_address.decode() + mongodb_database = mongodb_collection_address.schema + mongodb_collection = mongodb_collection_address.table + + cratedb_address = DatabaseAddress.from_string(target_url) + cratedb_uri, cratedb_table_address = cratedb_address.decode() + + # Configure machinery. + relay = MongoDBCDCRelayCrateDB( + mongodb_url=str(mongodb_uri), + mongodb_database=mongodb_database, + mongodb_collection=mongodb_collection, + cratedb_sqlalchemy_url=str(cratedb_uri), + cratedb_table=cratedb_table_address.fullname, + ) + + # Invoke machinery. + relay.start() diff --git a/cratedb_toolkit/io/mongodb/cdc.py b/cratedb_toolkit/io/mongodb/cdc.py new file mode 100644 index 00000000..83c7958d --- /dev/null +++ b/cratedb_toolkit/io/mongodb/cdc.py @@ -0,0 +1,62 @@ +""" +Basic relaying of a MongoDB Change Stream into CrateDB table. + +Documentation: +- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md +- https://www.mongodb.com/docs/manual/changeStreams/ +- https://www.mongodb.com/developer/languages/python/python-change-streams/ +""" + +import logging + +import pymongo +import sqlalchemy as sa +from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB + +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class MongoDBCDCRelayCrateDB: + """ + Relay MongoDB Change Stream into CrateDB table. + """ + + def __init__( + self, + mongodb_url: str, + mongodb_database: str, + mongodb_collection: str, + cratedb_sqlalchemy_url: str, + cratedb_table: str, + ): + self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True) + self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url) + self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] + self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table) + self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + + def start(self): + """ + Subscribe to change stream events, convert to SQL, and submit to CrateDB. + """ + with self.cratedb_adapter.engine.connect() as connection: + connection.execute(sa.text(self.cdc.sql_ddl)) + for sql in self.cdc_to_sql(): + if sql: + connection.execute(sa.text(sql)) + connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) + + def cdc_to_sql(self): + """ + Subscribe to change stream events, and emit corresponding SQL statements. + """ + # Note that `.watch()` will block until events are ready for consumption, so + # this is not a busy loop. Also note that the routine doesn't perform any sensible + # error handling yet. + while True: + with self.mongodb_collection.watch(full_document="updateLookup") as change_stream: + for change in change_stream: + logger.info("MongoDB Change Stream event: %s" % change) + yield self.cdc.to_sql(change) diff --git a/cratedb_toolkit/model.py b/cratedb_toolkit/model.py index 754bb48f..c4c4dad5 100644 --- a/cratedb_toolkit/model.py +++ b/cratedb_toolkit/model.py @@ -85,12 +85,15 @@ class TableAddress: @property def fullname(self): - if self.schema is None and self.table is None: + if self.table is None: raise ValueError("Uninitialized table address can not be serialized") if self.schema and self.table: - return f'"{self.schema}"."{self.table}"' + schema = self.schema.strip('"') + table = self.table.strip('"') + return f'"{schema}"."{table}"' else: - return f'"{self.table}"' + table = self.table.strip('"') + return f'"{table}"' @dataclasses.dataclass diff --git a/doc/io/mongodb/cdc.md b/doc/io/mongodb/cdc.md new file mode 100644 index 00000000..f4c8a0ab --- /dev/null +++ b/doc/io/mongodb/cdc.md @@ -0,0 +1,268 @@ +(mongodb-cdc-relay)= +# MongoDB CDC Relay + +## About +Relay a [MongoDB Change Stream] into a [CrateDB] table using a one-stop command +`ctk load table mongodb+cdc://...`, or `mongodb+srv+cdc://` for MongoDB Atlas. + +You can use it in order to facilitate convenient data transfers to be used +within data pipelines or ad hoc operations. It can be used as a CLI interface, +and as a library. + + +## Install +```shell +pip install --upgrade 'cratedb-toolkit[mongodb]' +``` + +:::{tip} +The tutorial also uses the programs `crash`, `mongosh`, and `atlas`. `crash` +will be installed with CrateDB Toolkit, but `mongosh` and `atlas` must be +installed by other means. If you are using Docker anyway, please use those +command aliases to provide them to your environment without actually needing +to install them. + +```shell +alias mongosh='docker run -i --rm --network=host mongo:7 mongosh' +``` + +The `atlas` program needs to store authentication information between invocations, +therefore you need to supply a storage volume. +```shell +mkdir atlas-config +alias atlas='docker run --rm -it --volume=$(pwd)/atlas-config:/root mongodb/atlas atlas' +``` +::: + + +## Usage + +(mongodb-cdc-workstation)= +### Workstation +The guidelines assume that both services, CrateDB and MongoDB, are listening on +`localhost`. +Please find guidelines how to provide them on your workstation using +Docker or Podman in the {ref}`mongodb-cdc-services-standalone` section below. +```shell +export MONGODB_URL=mongodb://localhost/testdrive +export MONGODB_URL_CTK=mongodb+cdc://localhost/testdrive/demo +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost/testdrive/demo-cdc +ctk load table "${MONGODB_URL_CTK}" +``` + +Insert document into MongoDB collection, and update it. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +Query data in CrateDB. +```shell +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +Invoke a delete operation, and check data in CrateDB once more. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.deleteOne({"foo": "bar"})' +crash --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + +(mongodb-cdc-cloud)= +### Cloud +The guidelines assume usage of cloud variants for both services, CrateDB Cloud +and MongoDB Atlas. +Please find guidelines how to provision relevant cloud resources +in the {ref}`mongodb-cdc-services-cloud` section below. + +:::{rubric} Invoke pipeline +::: +A canonical invocation for ingesting MongoDB Atlas Change Streams into +CrateDB Cloud. + +```shell +export MONGODB_URL=mongodb+srv://user:password@testdrive.jaxmmfp.mongodb.net/testdrive +export MONGODB_URL_CTK=mongodb+srv+cdc://user:password@testdrive.jaxmmfp.mongodb.net/testdrive/demo +export CRATEDB_HTTP_URL="https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/" +export CRATEDB_SQLALCHEMY_URL="crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/testdrive/demo-cdc?ssl=true" +``` +```shell +ctk load table "${MONGODB_URL_CTK}" +``` + +:::{note} +Please note the `mongodb+srv://` and `mongodb+srv+cdc://` URL schemes, and the +`ssl=true` query parameter. Both are needed to establish connectivity with +MongoDB Atlas and CrateDB. +::: + +:::{rubric} Trigger CDC events +::: +Inserting a document into the MongoDB collection, and updating it, will trigger two CDC events. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.insertOne({"foo": "bar"})' +mongosh "${MONGODB_URL}" --eval 'db.demo.updateOne({"foo": "bar"}, { $set: { status: "D" } })' +``` + +:::{rubric} Query data in CrateDB +::: +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command 'SELECT * FROM "testdrive"."demo-cdc";' +``` + + +## Appendix + +### Database Operations +A few operations that are handy when exploring this exercise. + +Reset MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.drop()' +``` + +Reset CrateDB table. +```shell +crash --command 'DELETE FROM "testdrive"."demo-cdc";' +``` + +Display documents in MongoDB collection. +```shell +mongosh "${MONGODB_URL}" --eval 'db.demo.find()' +``` + +(mongodb-cdc-services-standalone)= +### Standalone Services +Quickly start CrateDB and MongoDB using Docker or Podman. + +#### CrateDB +Start CrateDB. +```shell +docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \ + crate:5.7 -Cdiscovery.type=single-node +``` + +#### MongoDB +Start MongoDB. +Please note that change streams are only available for replica sets and +sharded clusters, so let's define a replica set by using the +`--replSet rs-testdrive` option when starting the MongoDB server. +```shell +docker run -it --rm --name=mongodb --publish=27017:27017 \ + mongo:7 mongod --replSet rs-testdrive +``` + +Now, initialize the replica set, by using the `mongosh` command to invoke +the `rs.initiate()` operation. +```shell +export MONGODB_URL="mongodb://localhost/" +docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <=2", ] mongodb = [ + "commons-codec[mongodb]==0.0.2", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",