Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform: MongoDB CDC event to CrateDB SQL #4

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: "Tests: PR"
name: "Tests"

on:
push:
Expand All @@ -11,9 +11,10 @@ concurrency:
cancel-in-progress: true

jobs:
test:

test-vanilla:
name: "
Python: ${{ matrix.python-version }}
Vanilla: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
Expand Down Expand Up @@ -60,7 +61,62 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: main
flags: vanilla
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true


test-mongodb:
name: "
MongoDB: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ['ubuntu-latest']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path:
pyproject.toml

- name: Set up project
run: |

# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb]

- name: Run linters and software tests
run: poe check

# https://github.com/codecov/codecov-action
- name: Upload coverage results to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: mongodb
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Added transformer for MongoDB CDC to CrateDB SQL conversion

## 2024/07/16 v0.0.1
- Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ To install the most recent version, run:
pip install --upgrade commons-codec
```

## Usage
In order to learn how to use the library, please visit the [documentation],
and explore the source code or its [examples].

## License
The project uses the LGPLv3 license for the whole ensemble. However, individual
portions of the code base are vendorized from other Python packages, where
Expand All @@ -50,6 +54,8 @@ both libraries' ingredients don't have anything in common, yet.

[Apache Commons Codec]: https://commons.apache.org/proper/commons-codec/
[commons-codec]: https://pypi.org/project/commons-codec/
[documentation]: https://github.com/daq-tools/commons-codec/tree/main/docs
[examples]: https://github.com/daq-tools/commons-codec/tree/main/examples
[Kotori]: https://github.com/daq-tools/kotori
[LorryStream]: https://github.com/daq-tools/lorrystream/
[PyPI]: https://pypi.org/
101 changes: 101 additions & 0 deletions docs/mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Relay MongoDB Change Stream into CrateDB

## About
[mongodb_cdc_cratedb.py] demonstrates a basic example program to relay event
records from [MongoDB Change Streams] into [CrateDB].

> Change streams allow applications to access real-time data changes without the prior
> complexity and risk of manually tailing the oplog. Applications can use change streams
> to subscribe to all data changes on a single collection, a database, or an entire
> deployment, and immediately react to them.
>
> - https://www.mongodb.com/docs/manual/changeStreams/
> - https://www.mongodb.com/developer/languages/python/python-change-streams/


## Services

### CrateDB
Start CrateDB.
```shell
docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \
crate:5.7 -Cdiscovery.type=single-node
```

### MongoDB
Start MongoDB.
Please note that change streams are only available for replica sets and
sharded clusters, so let's define a replica set by using the
`--replSet rs-testdrive` option when starting the MongoDB server.
```shell
docker run -it --rm --name=mongodb --publish=27017:27017 \
mongo:7 mongod --replSet rs-testdrive
```

Now, initialize the replica set, by using the `mongosh` command to invoke
the `rs.initiate()` operation.
```shell
export MONGODB_URL="mongodb://localhost/"
docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <<EOF

config = {
_id: "rs-testdrive",
members: [{ _id : 0, host : "localhost:27017"}]
};
rs.initiate(config);

EOF
```


## Install
Acquire and set up the basic relay program.
```shell
# Install dependencies.
pip install 'commons-codec[mongodb]' pymongo sqlalchemy-cratedb

# Download program.
wget https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py
```


## Usage

Configure settings.
```shell
export CRATEDB_SQLALCHEMY_URL="crate://"
export MONGODB_URL="mongodb://localhost/"
```

Invoke relay program.
```shell
python mongodb_cdc_cratedb.py cdc-relay
```

Invoke database workload.
```shell
python mongodb_cdc_cratedb.py db-workload
```


## Troubleshooting

When you see this message on MongoDB's server log, it indicates you tried to
configure a replica set, but did not initialize it yet.
```text
pymongo.errors.OperationFailure: The $changeStream stage is only supported on
replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is
only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'}
```

When you see a `Failed to refresh key cache` error message on MongoDB's server
log, it indicates the server has been successfully running a replica set last
time, but, again, it has not been correctly initialized.

- https://stackoverflow.com/questions/70518350/mongodb-replicaset-failed-to-refresh-key-cache
- https://www.mongodb.com/community/forums/t/how-to-recover-mongodb-from-failed-to-refresh-key-cache/239079


[CrateDB]: https://github.com/crate/crate
[mongodb_cdc_cratedb.py]: https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py
[MongoDB Change Streams]: https://www.mongodb.com/docs/manual/changeStreams/
106 changes: 106 additions & 0 deletions examples/mongodb_cdc_cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Basic example relaying a MongoDB Change Stream into CrateDB table.

Documentation:
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md
- https://www.mongodb.com/docs/manual/changeStreams/
- https://www.mongodb.com/developer/languages/python/python-change-streams/
"""

import datetime as dt
import os
import sys

import pymongo
import sqlalchemy as sa
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB


class MiniRelay:
"""
Relay MongoDB Change Stream into CrateDB table, and provide basic example workload generator.
"""

def __init__(
self,
mongodb_url: str,
mongodb_database: str,
mongodb_collection: str,
cratedb_sqlalchemy_url: str,
cratedb_table: str,
):
self.cratedb_client = sa.create_engine(cratedb_sqlalchemy_url, echo=True)
self.mongodb_client = pymongo.MongoClient(mongodb_url)
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection]
self.table_name = cratedb_table
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name)

def start(self):
"""
Subscribe to change stream events, convert to SQL, and submit to CrateDB.
"""
with self.cratedb_client.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
connection.execute(sa.text(f'REFRESH TABLE "{self.table_name}";'))

def cdc_to_sql(self):
"""
Subscribe to change stream events, and emit corresponding SQL statements.
"""
# Note that `.watch()` will block until events are ready for consumption, so
# this is not a busy loop. Also note that the routine doesn't perform any sensible
# error handling yet.
while True:
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream:
for change in change_stream:
print("MongoDB Change Stream event:", change, file=sys.stderr)
yield self.cdc.to_sql(change)

def db_workload(self):
"""
Run insert_one, update_one, and delete_one operations to generate a very basic workload.
"""
example_record = {
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": dt.datetime.fromisoformat("2024-07-12T01:17:42+02:00"), "device": "foo"},
}

print(self.mongodb_collection.insert_one(example_record))
# print(self.mongodb_collection.update_one({"id": "5F9E"}, {"$set": {"data": {"temperature": 42.50}}}))

# TODO: Investigate: When applying the "replace" operation, subsequent "delete" operations
# will not be reported to the change stream any longer. Is it a bug?
# print(self.mongodb_collection.replace_one({"id": "5F9E"}, {"tags": ["deleted"]}))

# print(self.mongodb_collection.delete_one({"id": "5F9E"}))

# Drop operations are ignored anyway.
# print(self.mongodb_collection.drop())


if __name__ == "__main__":
# Decode subcommand from command line argument.
if len(sys.argv) < 2:
raise ValueError("Subcommand missing. Accepted subcommands: subscribe, workload")
subcommand = sys.argv[1]

# Configure machinery.
relay = MiniRelay(
mongodb_url=os.environ["MONGODB_URL"],
mongodb_database="testdrive",
mongodb_collection="data",
cratedb_sqlalchemy_url=os.environ["CRATEDB_SQLALCHEMY_URL"],
cratedb_table="cdc-testdrive",
)

# Invoke machinery.
if subcommand == "cdc-relay":
relay.start()
elif subcommand == "db-workload":
relay.db_workload()
else:
raise ValueError("Accepted subcommands: subscribe, workload")
12 changes: 10 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ description = "Data decoding, encoding, conversion, and translation utilities."
readme = "README.md"
keywords = [
"airrohr",
"bson",
"cdc",
"conversion",
"convert",
"data",
"decode",
"deserialize",
"dynamodb",
"encode",
"i/o",
"json",
"map data",
"marshall",
"mongodb",
"nested data",
"serialize",
"sql",
Expand Down Expand Up @@ -105,6 +109,9 @@ optional-dependencies.develop = [
"ruff<0.6",
"validate-pyproject<0.19",
]
optional-dependencies.mongodb = [
"pymongo<4.9",
]
optional-dependencies.release = [
"build<2",
"twine<6",
Expand Down Expand Up @@ -168,8 +175,9 @@ lint.extend-ignore = [
]

lint.per-file-ignores."examples/*" = [
"T201",
] # Allow `print`
"ERA001", # Found commented-out code
"T201", # Allow `print`
]

# ===================
# Tasks configuration
Expand Down
Loading