Skip to content

Commit

Permalink
Add transformer for MongoDB CDC to CrateDB SQL conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 16, 2024
1 parent f5975b1 commit ab24a07
Show file tree
Hide file tree
Showing 8 changed files with 632 additions and 6 deletions.
64 changes: 60 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: "Tests: PR"
name: "Tests"

on:
push:
Expand All @@ -11,9 +11,10 @@ concurrency:
cancel-in-progress: true

jobs:
test:

test-vanilla:
name: "
Python: ${{ matrix.python-version }}
Vanilla: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
Expand Down Expand Up @@ -60,7 +61,62 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: main
flags: vanilla
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true


test-mongodb:
name: "
MongoDB: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path:
pyproject.toml

- name: Set up project
run: |
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb]
- name: Run linters and software tests
run: poe check

# https://github.com/codecov/codecov-action
- name: Upload coverage results to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: mongodb
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added transformer for MongoDB CDC to CrateDB SQL conversion

## 2024/07/16 v0.0.1
- Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ To install the most recent version, run:
pip install --upgrade commons-codec
```

## Usage
In order to learn how to use the library, please visit the [documentation],
and explore the source code or its [examples].

## License
The project uses the LGPLv3 license for the whole ensemble. However, individual
portions of the code base are vendorized from other Python packages, where
Expand All @@ -50,6 +54,8 @@ both libraries' ingredients don't have anything in common, yet.

[Apache Commons Codec]: https://commons.apache.org/proper/commons-codec/
[commons-codec]: https://pypi.org/project/commons-codec/
[documentation]: https://github.com/daq-tools/commons-codec/tree/main/docs
[examples]: https://github.com/daq-tools/commons-codec/tree/main/examples
[Kotori]: https://github.com/daq-tools/kotori
[LorryStream]: https://github.com/daq-tools/lorrystream/
[PyPI]: https://pypi.org/
89 changes: 89 additions & 0 deletions docs/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Relay MongoDB Change Stream into CrateDB table

## Introduction

> Change streams allow applications to access real-time data changes without the prior
> complexity and risk of manually tailing the oplog. Applications can use change streams
> to subscribe to all data changes on a single collection, a database, or an entire
> deployment, and immediately react to them.
>
> - https://www.mongodb.com/docs/manual/changeStreams/
> - https://www.mongodb.com/developer/languages/python/python-change-streams/
## Services

### CrateDB
Start CrateDB.
```shell
docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \
crate:5.7 -Cdiscovery.type=single-node
```

### MongoDB
Start MongoDB.
```shell
docker run -it --rm --name=mongodb --publish=27017:27017 \
mongo:7 mongod --replSet rs-testdrive
```

Please note that change streams are only available for replica sets and
sharded clusters, so let's initialize the replica set defined with
`--replSet rs-testdrive`, by using the `mongosh` command to invoke
the `rs.initiate()` operation.

```shell
export MONGODB_URL="mongodb://localhost/"
docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <<EOF
config = {
_id: "rs-testdrive",
members: [{ _id : 0, host : "localhost:27017"}]
};
rs.initiate(config);
EOF
```


## Install
Acquire and set up the relay program.
```shell
wget https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py
pip install 'commons-codec[mongodb]' pymongo sqlalchemy-cratedb
```


## Usage

Configure settings.
```shell
export CRATEDB_SQLALCHEMY_URL="crate://"
export MONGODB_URL="mongodb://localhost/"
```

Invoke relay program.
```shell
python mongodb_cdc_cratedb.py cdc-relay
```

Invoke database workload.
```shell
python mongodb_cdc_cratedb.py db-workload
```


## Troubleshooting

When you see this message on MongoDB's server log, it indicates you tried to
configure a replica set, but did not initialize it yet.
```text
pymongo.errors.OperationFailure: The $changeStream stage is only supported on
replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is
only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'}
```

When you see a `Failed to refresh key cache` error message on MongoDB's server
log, it indicates the server has been successfully running a replica set last
time, but, again, it has not been correctly initialized.
- https://stackoverflow.com/questions/70518350/mongodb-replicaset-failed-to-refresh-key-cache
- https://www.mongodb.com/community/forums/t/how-to-recover-mongodb-from-failed-to-refresh-key-cache/239079
106 changes: 106 additions & 0 deletions examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Basic example relaying a MongoDB Change Stream into CrateDB table.
Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

import datetime as dt
import os
import sys

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB


class MiniRelay:
"""
Relay MongoDB Change Stream into CrateDB table, and provide basic example workload generator.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
):
self.cratedb_client = sa.create_engine(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = cratedb_table
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
with self.cratedb_client.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f'REFRESH TABLE "{self.table_name}";'))

def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop. Also note that the routine doesn't perform any sensible
# error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
print("MongoDB Change Stream event:", change, file=sys.stderr)
yield self.cdc.to_sql(change)

def db_workload(self):
"""
Run insert_one, update_one, and delete_one operations to generate a very basic workload.
"""
example_record = {
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": dt.datetime.fromisoformat("2024-07-12T01:17:42+02:00"), "device": "foo"},
}

print(self.mongodb_collection.insert_one(example_record))
print(self.mongodb_collection.update_one({"id": "5F9E"}, {"$set": {"data": {"temperature": 42.50}}}))

# TODO: Investigate: When applying the "replace" operation, subsequent "delete" operations
# will not be reported to the change stream any longer. Is it a bug?
# print(self.mongodb_collection.replace_one({"id": "5F9E"}, {"tags": ["deleted"]}))

print(self.mongodb_collection.delete_one({"id": "5F9E"}))

# Drop operations are ignored anyway.
# print(self.mongodb_collection.drop())


if __name__ == "__main__":
# Decode subcommand from command line argument.
if len(sys.argv) < 2:
raise ValueError("Subcommand missing. Accepted subcommands: subscribe, workload")
subcommand = sys.argv[1]

# Configure machinery.
relay = MiniRelay(
mongodb_url=os.environ["MONGODB_URL"],
mongodb_database="testdrive",
mongodb_collection="data",
cratedb_sqlalchemy_url=os.environ["CRATEDB_SQLALCHEMY_URL"],
cratedb_table="cdc-testdrive",
)

# Invoke machinery.
if subcommand == "cdc-relay":
relay.start()
elif subcommand == "db-workload":
relay.db_workload()
else:
raise ValueError("Accepted subcommands: subscribe, workload")
12 changes: 10 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ description = "Data decoding, encoding, conversion, and translation utilities."
readme = "README.md"
keywords = [
"airrohr",
"bson",
"cdc",
"conversion",
"convert",
"data",
"decode",
"deserialize",
"dynamodb",
"encode",
"i/o",
"json",
"map data",
"marshall",
"mongodb",
"nested data",
"serialize",
"sql",
Expand Down Expand Up @@ -105,6 +109,9 @@ optional-dependencies.develop = [
"ruff<0.6",
"validate-pyproject<0.19",
]
optional-dependencies.mongodb = [
"pymongo<4.9",
]
optional-dependencies.release = [
"build<2",
"twine<6",
Expand Down Expand Up @@ -168,8 +175,9 @@ lint.extend-ignore = [
]

lint.per-file-ignores."examples/*" = [
"T201",
] # Allow `print`
"ERA001", # Found commented-out code
"T201", # Allow `print`
]

# ===================
# Tasks configuration
Expand Down
Loading

0 comments on commit ab24a07

Please sign in to comment.