Skip to content

Commit

Permalink
MongoDB CDC: Add support for MongoDB Change Streams to ctk load table
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 19, 2024
1 parent 731e007 commit 5099dd6
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- `ctk load table`: Added support for MongoDB Change Streams

## 2024/07/08 v0.0.15
- IO: Added the `if-exists` query parameter by updating to influxio 0.4.0.
Expand Down
16 changes: 11 additions & 5 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ def load_table(self, resource: InputOutputResource, target: TableAddress):
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("mongodb"):
from cratedb_toolkit.io.mongodb.api import mongodb_copy
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")
from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
mongodb_relay_cdc(source_url, target_url, progress=True)
else:
from cratedb_toolkit.io.mongodb.api import mongodb_copy

if not mongodb_copy(source_url, target_url, progress=True):
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
else:
raise NotImplementedError("Importing resource not implemented yet")
39 changes: 39 additions & 0 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging

from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
Expand Down Expand Up @@ -68,3 +69,41 @@ def mongodb_copy(source_url, target_url, progress: bool = False):
cr8_insert_json(infile=buffer, hosts=cratedb_address.httpuri, table=cratedb_table_address.fullname)

return True


def mongodb_relay_cdc(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo-cdc
ctk load table mongodb+cdc://localhost:27017/testdrive/demo
Backlog
-------
TODO: Run on multiple collections.
TODO: Run on the whole database.
TODO: Accept parameters like `if_exists="append,replace"`.
TODO: Propagate parameters like `scan="full"`.
"""
logger.info("Running MongoDB CDC relay")

# Decode database URL.
mongodb_address = DatabaseAddress.from_string(source_url)
mongodb_uri, mongodb_collection_address = mongodb_address.decode()
mongodb_database = mongodb_collection_address.schema
mongodb_collection = mongodb_collection_address.table

cratedb_address = DatabaseAddress.from_string(target_url)
cratedb_uri, cratedb_table_address = cratedb_address.decode()

# Configure machinery.
relay = MongoDBCDCRelayCrateDB(
mongodb_url=str(mongodb_uri),
mongodb_database=mongodb_database,
mongodb_collection=mongodb_collection,
cratedb_sqlalchemy_url=str(cratedb_uri),
cratedb_table=cratedb_table_address.fullname,
)

# Invoke machinery.
relay.start()
62 changes: 62 additions & 0 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Basic relaying of a MongoDB Change Stream into CrateDB table.
Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

import logging

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB

from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class MongoDBCDCRelayCrateDB:
"""
Relay MongoDB Change Stream into CrateDB table.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
):
self.cratedb_adapter = DatabaseAdapter(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client: pymongo.MongoClient = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f"REFRESH TABLE {self.table_name};"))

def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop. Also note that the routine doesn't perform any sensible
# error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
logger.info("MongoDB Change Stream event: %s" % change)
yield self.cdc.to_sql(change)
9 changes: 6 additions & 3 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ class TableAddress:

@property
def fullname(self):
if self.schema is None and self.table is None:
if self.table is None:
raise ValueError("Uninitialized table address can not be serialized")
if self.schema and self.table:
return f'"{self.schema}"."{self.table}"'
schema = self.schema.strip('"')
table = self.table.strip('"')
return f'"{schema}"."{table}"'
else:
return f'"{self.table}"'
table = self.table.strip('"')
return f'"{table}"'


@dataclasses.dataclass
Expand Down
Loading

0 comments on commit 5099dd6

Please sign in to comment.