diff --git a/CHANGES.md b/CHANGES.md index 67faa63..17acc5a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,19 @@ ## Unreleased +- Table Loader: Improved conditional handling of "transformation" parameter +- Table Loader: Improved status reporting and error logging in `BulkProcessor` +- MongoDB: Improve error reporting +- MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well, + use `fsspec` and `orjson` instead +- MongoDB Full: Improved initialization of transformation subsystem +- MongoDB Adapter: Improved performance of when computing collection cardinality + by using `collection.estimated_document_count()` +- MongoDB Full: Optionally use `limit` parameter as number of total records +- MongoDB Adapter: Evaluate `_id` filter field by upcasting to `bson.ObjectId`, + to convey a filter that makes `ctk load table` process a single document, + identified by its OID +- MongoDB Dependencies: Update to commons-codec 0.0.17 ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index b406074..078ac02 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -19,7 +19,9 @@ class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path): + def load_table( + self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None + ): raise NotImplementedError("Child class needs to implement this method") @@ -40,7 +42,7 @@ def load_table( self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, - transformation: Path = None, + transformation: t.Union[Path, None] = None, ): """ Load data into a database table on CrateDB Cloud. @@ -102,7 +104,9 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None): + def load_table( + self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None + ): """ Load data into a database table on a standalone CrateDB Server. @@ -168,4 +172,4 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf return False else: - raise NotImplementedError("Importing resource not implemented yet") + raise NotImplementedError(f"Importing resource not implemented yet: {source_url_obj}") diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index cd03961..8ebb772 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,5 +1,6 @@ import logging import sys +import typing as t from pathlib import Path import click @@ -49,7 +50,7 @@ def load_table( table: str, format_: str, compression: str, - transformation: Path, + transformation: t.Union[Path, None], ): """ Import data into CrateDB and CrateDB Cloud clusters. @@ -64,6 +65,10 @@ def load_table( if not cluster_id and not cratedb_sqlalchemy_url and not cratedb_http_url: raise KeyError(error_message) + # When `--transformation` is given, but empty, fix it. + if transformation is not None and transformation.name == "": + transformation = None + # When SQLAlchemy URL is not given, but HTTP URL is, compute the former on demand. if cluster_id: address = None diff --git a/cratedb_toolkit/io/core.py b/cratedb_toolkit/io/core.py index d793b89..157e1e6 100644 --- a/cratedb_toolkit/io/core.py +++ b/cratedb_toolkit/io/core.py @@ -1,4 +1,5 @@ # TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration? +import json import typing as t from functools import cached_property @@ -120,14 +121,15 @@ def log_level(self): def start(self) -> BulkMetrics: # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + batch_count = 0 for batch in self.data: + batch_count += 1 + self.progress_bar and self.progress_bar.set_description("READ ") current_batch_size = len(batch) - - self.progress_bar and self.progress_bar.set_description("ACQUIRE") - try: operation = self.batch_to_operation(batch) except Exception as ex: + self._metrics.count_error_total += current_batch_size self.log_level(f"Computing query failed: {ex}") if self.on_error == "raise": raise @@ -137,7 +139,7 @@ def start(self) -> BulkMetrics: statement = sa.text(operation.statement) # Submit operation to CrateDB, using `bulk_args`. - self.progress_bar and self.progress_bar.set_description("SUBMIT ") + self.progress_bar and self.progress_bar.set_description("WRITE") try: cursor = self.connection.execute(statement=statement, parameters=operation.parameters) self.connection.commit() @@ -158,7 +160,7 @@ def start(self) -> BulkMetrics: # in order to relay proper error messages to the user. if failed_records: logger.warning( - f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " + f"Incomplete batch #{batch_count}. Records processed: {count_success_local}/{current_batch_size}. " f"Falling back to per-record operations." ) for record in failed_records: @@ -167,8 +169,8 @@ def start(self) -> BulkMetrics: self.connection.commit() self._metrics.count_success_total += 1 except Exception as ex: - logger.warning(f"Operation failed: {ex}") - logger.debug(f"Failing record: {record}") + logger.error(f"Operation failed: {ex}") + logger.debug(f"Invalid record:\n{json.dumps(record, indent=2)}") self._metrics.count_error_total += 1 self._metrics.bytes_error_total += asizeof(record) if self.on_error == "raise": diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index 9163321..b9deb5c 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -4,11 +4,12 @@ import logging import typing as t from abc import abstractmethod +from copy import deepcopy from functools import cached_property from pathlib import Path import boltons.urlutils -import polars as pl +import bson import pymongo import yarl from attrs import define, field @@ -18,6 +19,7 @@ from cratedb_toolkit.io.mongodb.util import batches from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util.io import read_json logger = logging.getLogger(__name__) @@ -56,7 +58,7 @@ def batch_size(self) -> int: return int(self.address.uri.query_params.get("batch-size", 100)) @cached_property - def filter(self) -> t.Union[str, None]: + def filter(self) -> t.Union[t.Dict[str, t.Any], None]: return json.loads(self.address.uri.query_params.get("filter", "null")) @cached_property @@ -106,13 +108,13 @@ def query(self): if not self._path.exists(): raise FileNotFoundError(f"Resource not found: {self._path}") if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by filesystem adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by filesystem adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by filesystem adapter") if self._path.suffix in [".json", ".jsonl", ".ndjson"]: - data = pl.read_ndjson( - self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._path)) elif ".bson" in str(self._path): data = IterableData(str(self._path), options={"format_in": "bson"}).iter() else: @@ -137,15 +139,15 @@ def record_count(self, filter_=None) -> int: def query(self): if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by remote resource adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by remote resource adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by remote resource adapter") if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): - data = pl.read_ndjson( - str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._url)) elif self._url.path.endswith(".bson"): - raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC") + raise NotImplementedError("HTTP+BSON loader does not support .bson files yet.") else: raise ValueError(f"Unsupported file type: {self._url}") return batches(data, self.batch_size) @@ -170,12 +172,20 @@ def get_collections(self) -> t.List[str]: return sorted(database.list_collection_names()) def record_count(self, filter_=None) -> int: + """ + # Exact. Takes too long on large collections. filter_ = filter_ or {} return self._mongodb_collection.count_documents(filter=filter_) + """ + return self._mongodb_collection.estimated_document_count() def query(self): + _filter = deepcopy(self.filter) + # Evaluate `_id` filter field specially, by upcasting to `bson.ObjectId`. + if _filter and "_id" in _filter: + _filter["_id"] = bson.ObjectId(_filter["_id"]) data = ( - self._mongodb_collection.find(filter=self.filter) + self._mongodb_collection.find(filter=_filter) .batch_size(self.batch_size) .skip(self.offset) .limit(self.limit) diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 12c05ca..1b946f3 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -5,6 +5,7 @@ from boltons.urlutils import URL from polars.exceptions import PanicException +from zyp.model.project import TransformationProject from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB @@ -93,7 +94,7 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi def mongodb_copy( source_url: t.Union[str, URL], target_url: t.Union[str, URL], - transformation: t.Union[Path, None] = None, + transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None, progress: bool = False, ): """ @@ -111,9 +112,7 @@ def mongodb_copy( target_url = URL(target_url) # Optionally configure transformations. - tm = None - if transformation: - tm = TransformationManager(path=transformation) + tm = TransformationManager.from_any(transformation) # Check if source address URL includes a table name or not. has_table = True @@ -169,7 +168,7 @@ def mongodb_copy( try: outcome_task = task.start() except (Exception, PanicException) as ex: - logger.error(f"Task failed: {ex}") + logger.exception(f"Task failed: {ex}") outcome_task = False outcome = outcome and outcome_task diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a88ff00..febfbe4 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -71,7 +71,11 @@ def start(self): Read items from MongoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}") - records_in = self.mongodb_adapter.record_count() + limit = self.mongodb_adapter.limit + if limit > 0: + records_in = limit + else: + records_in = self.mongodb_adapter.record_count() logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}") with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 5a34207..2d63605 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -92,7 +92,7 @@ def extract(args) -> t.Dict[str, t.Any]: rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan") - tm = TransformationManager(path=args.transformation) + tm = TransformationManager.from_path(path=args.transformation) schemas = OrderedDict() for collection_name in filtered_collections: collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit) @@ -121,7 +121,7 @@ def export(args) -> t.IO[bytes]: TODO: Run on multiple collections, like `extract`. """ - tm = TransformationManager(path=args.transformation) + tm = TransformationManager.from_path(path=args.transformation) buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit) diff --git a/cratedb_toolkit/io/mongodb/transform.py b/cratedb_toolkit/io/mongodb/transform.py index 13e27d5..f71dafc 100644 --- a/cratedb_toolkit/io/mongodb/transform.py +++ b/cratedb_toolkit/io/mongodb/transform.py @@ -10,17 +10,33 @@ class TransformationManager: - def __init__(self, path: Path): - self.path = path - self.active = False - if not self.path: - return - if not self.path.exists(): - raise FileNotFoundError(f"File does not exist: {self.path}") - self.project = TransformationProject.from_yaml(self.path.read_text()) - logger.info("Transformation manager initialized. File: %s", self.path) + def __init__(self, project: TransformationProject): + self.project = project self.active = True + @classmethod + def from_any(cls, transformation=None): + if transformation is None: + return None + elif isinstance(transformation, TransformationManager): + return transformation + elif isinstance(transformation, TransformationProject): + return cls(project=transformation) + elif isinstance(transformation, Path): + return cls.from_path(path=transformation) + else: + raise ValueError(f"Unable to initialize transformation manager from {type(transformation)}") + + @classmethod + def from_path(cls, path: Path): + if not path: + return None + if not path.exists(): + raise FileNotFoundError(f"File does not exist: {path}") + logger.info("Loading Zyp transformation file: %s", path) + project = TransformationProject.from_yaml(path.read_text()) + return cls(project=project) + def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]): if not self.active: return diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 0aed69a..0df03b5 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,7 @@ import re import typing as t +from commons_codec.transform.mongodb import Document from pymongo.cursor import Cursor from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents @@ -44,7 +45,9 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict: return d -def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]: +def batches( + data: t.Union[Cursor, Documents, t.Generator[Document, None, None]], batch_size: int = 100 +) -> t.Generator[Documents, None, None]: """ Generate batches of documents. """ diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index f5ebc3e..17d27b0 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -24,7 +24,7 @@ def boot_click(ctx: click.Context, verbose: bool = False, debug: bool = False): log_level = logging.DEBUG # Setup logging, according to `verbose` / `debug` flags. - setup_logging(level=log_level, verbose=verbose) + setup_logging(level=log_level, verbose=verbose, debug=debug) def split_list(value: str, delimiter: str = ",") -> t.List[str]: diff --git a/cratedb_toolkit/util/common.py b/cratedb_toolkit/util/common.py index c5aaa6e..9ab9cd1 100644 --- a/cratedb_toolkit/util/common.py +++ b/cratedb_toolkit/util/common.py @@ -1,12 +1,15 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging +import os import colorlog from colorlog.escape_codes import escape_codes +from cratedb_toolkit.util.data import asbool -def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): + +def setup_logging(level=logging.INFO, verbose: bool = False, debug: bool = False, width: int = 36): reset = escape_codes["reset"] log_format = f"%(asctime)-15s [%(name)-{width}s] %(log_color)s%(levelname)-8s:{reset} %(message)s" @@ -15,14 +18,25 @@ def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): logging.basicConfig(format=log_format, level=level, handlers=[handler]) - # Enable SQLAlchemy logging. - if verbose: - logging.getLogger("sqlalchemy").setLevel(level) - logging.getLogger("crate.client").setLevel(level) logging.getLogger("sqlalchemy_cratedb").setLevel(level) logging.getLogger("urllib3.connectionpool").setLevel(level) + # Enable SQLAlchemy logging. + if verbose: + logging.getLogger("cratedb_toolkit").setLevel(logging.DEBUG) + + if debug: + # Optionally tame SQLAlchemy and PyMongo. + if asbool(os.environ.get("DEBUG_SQLALCHEMY")): + logging.getLogger("sqlalchemy").setLevel(level) + else: + logging.getLogger("sqlalchemy").setLevel(logging.INFO) + if asbool(os.environ.get("DEBUG_PYMONGO")): + logging.getLogger("pymongo").setLevel(level) + else: + logging.getLogger("pymongo").setLevel(logging.INFO) + # logging.getLogger("docker.auth").setLevel(logging.INFO) # noqa: ERA001 # Tame Faker spamming the logs. diff --git a/cratedb_toolkit/util/io.py b/cratedb_toolkit/util/io.py new file mode 100644 index 0000000..f52fc43 --- /dev/null +++ b/cratedb_toolkit/util/io.py @@ -0,0 +1,15 @@ +import orjsonl +from fsspec import filesystem +from upath import UPath + + +def read_json(url: str): + """ + Read JSON file from anywhere. + TODO: Does the resource need to be closed? How? + """ + p = UPath(url) + fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs + fp = fs.open(p.path) + data = orjsonl.stream(fp) + return data diff --git a/doc/backlog.md b/doc/backlog.md index 1a25a5d..0a34d86 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -1,5 +1,26 @@ # Backlog +## Iteration +1 +- Table Loader: Refactor URL dispatcher, use fsspec +- Table Loader/Docs: Advertise using OCI image +- MongoDB: Load table with querying by single object id +- MongoDB: Multi-phase BulkProcessor batch size adjustments +- MongoDB: Report byte sizes (cur/avg/total) in progress bar +- Documentation: + > The procedure employed by CTK uses the catch-all `data OBJECT(DYNAMIC)` + > storage strategy, which is sinking the source record/document into a + > single column in CrateDB. + > + > The transformation recipe attempts to outline a few features provided by + > Zyp Transformations, in this case exclusively applying transformations + > described by expressions written in jqlang. +- MongoDB/Docs: Describe usage of `mongoimport` and `mongoexport`. + ```shell + mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' + ``` +- MongoDB: Convert dates like `"date": "Sep 18 2015"`, see `testdrive.city_inspections`. +- Table Loader: Propagate offset/limit to progress bar + ## Iteration +2 - Address `fix_job_info_table_name` - Add more items about `ctk load table` to `examples/` folder diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 00324b2..ad638bf 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -48,6 +48,12 @@ Transfer all collections in database from MongoDB Atlas. export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker?batch-size=5000" ``` +:::{important} +When transferring **multiple collections**, make sure to use a CrateDB database +address which DOES NOT reference an individual table. +It MUST stop at the **schema** label, here, `ticker`. Likewise, the MongoDB +database address also MUST reference a **database**, NOT a specific collection. +::: ### MongoDB Community and Enterprise Transfer data from MongoDB database/collection into CrateDB schema/table. @@ -73,12 +79,21 @@ ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json" # Extended JSON, HTTP resource. ctk load table "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" -# Extended JSON, filesystem, multiple files. -ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json" - # BSON, filesystem, relative path, compressed. ctk load table "file+bson:./var/data/testdrive/books.bson.gz" + +# Extended JSON, filesystem, multiple files. +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/*.json?batch-size=2500" \ + --cratedb-sqlalchemy-url="crate://crate@localhost:4200/datasets" ``` +:::{important} +When transferring **multiple collections**, make sure to use a CrateDB database +address which DOES NOT reference an individual table. +It MUST stop at the **schema** label, here, `datasets`. Likewise, the path to +the MongoDB JSON files also MUST reference the **parent folder**, NOT a specific +JSON or BSON file. +::: To exercise a full example importing multiple MongoDB Extended JSON files, see [](#file-import-tutorial). @@ -94,8 +109,9 @@ appending the HTTP URL query parameter `batch-size` to the source URL, like ### Filter Use the HTTP URL query parameter `filter` on the source URL, like -`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB -query filter as a JSON string. +`&filter={"exchange":{"$eq":"NASDAQ"}}`, or +`&filter={"_id":"66f0002e98c00fb8261d87c8"}`, +in order to provide a MongoDB query filter as a JSON string. It works in the same way like `mongoexport`'s `--query` option. On more complex query expressions, make sure to properly encode the right value using URL/Percent Encoding. @@ -208,15 +224,19 @@ Alternatively, have a look at the canonical MongoDB C driver's [libbson test files]. ::: +## Troubleshooting +When importing from a BSON file, and observing a traceback like this, +```python +Traceback (most recent call last): + File "/path/to/site-packages/bson/__init__.py", line 1356, in decode_file_iter + yield _bson_to_dict(elements, opts) # type:ignore[type-var, arg-type, misc] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +bson.errors.InvalidBSON: not enough data for a BSON document +``` +please try to import the file into a MongoDB Server using `mongorestore`, +and export it again using `mongodump` or `mongoexport`, preferably using +recent versions like MongoDB 7 and tools version 100.9.5 or higher. -### Backlog -:::{todo} -- Describe usage of `mongoimport` and `mongoexport`. - ```shell - mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' - ``` -- Convert dates like `"date": "Sep 18 2015"`, see `testdrive.city_inspections`. -::: [BSON]: https://en.wikipedia.org/wiki/BSON diff --git a/pyproject.toml b/pyproject.toml index af3c1ff..58705ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,7 @@ dependencies = [ "cratedb-sqlparse==0.0.6", 'importlib-metadata; python_version < "3.8"', 'importlib-resources; python_version < "3.9"', + "orjsonl<2", "polars<1.7", "pympler<1.2", "python-dateutil<3", @@ -154,8 +155,10 @@ influxdb = [ io = [ "cr8", "dask[dataframe]>=2020", + "fsspec[s3,http]", "pandas<3,>=1", "sqlalchemy>=2", + "universal-pathlib<0.3", ] kinesis = [ "aiobotocore<2.16", @@ -164,7 +167,7 @@ kinesis = [ "lorrystream[carabas]>=0.0.6", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.16", + "commons-codec[mongodb,zyp]>=0.0.17", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1", diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index a4350a2..7c9a11d 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -3,8 +3,12 @@ from pathlib import Path from unittest import mock +import bson import pymongo import pytest +from zyp import CollectionTransformation, MokshaTransformation +from zyp.model.collection import CollectionAddress +from zyp.model.project import TransformationProject from cratedb_toolkit.io.mongodb.api import mongodb_copy from tests.conftest import check_sqlalchemy2 @@ -61,7 +65,7 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb): assert results[0]["data"] == data_out -def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): +def test_mongodb_copy_server_collection_with_filter_timestamp(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering. """ @@ -99,6 +103,54 @@ def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): assert results[0]["data"] == data_out[1] +def test_mongodb_copy_server_collection_with_filter_oid(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering by oid. + """ + + # Define source and target URLs. + filter_expression = json.dumps({"_id": "66f0002e98c00fb8261d87c8"}) + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define data. + data_in = [ + { + "_id": bson.ObjectId("66efff8de45f4f7b695134a6"), + "device": "Räuber", + "temperature": 42.42, + "timestamp": 1563051934000, + }, + { + "_id": bson.ObjectId("66f0002e98c00fb8261d87c8"), + "device": "Hotzenplotz", + "temperature": 84.84, + "timestamp": 1563051934100, + }, + ] + data_out = deepcopy(data_in) + data_out[0].update({"_id": mock.ANY}) + data_out[1].update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + assert cratedb.database.count_records("testdrive.demo") == 1 + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0]["data"] == data_out[1] + + def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. @@ -283,7 +335,13 @@ def test_mongodb_copy_http_json_relaxed(caplog, cratedb): cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Run transfer command. - mongodb_copy(json_resource, cratedb_url) + transformation = TransformationProject().add( + CollectionTransformation( + address=CollectionAddress(container="datasets", name="books"), + pre=MokshaTransformation().jq(".[] |= (._id |= tostring)"), + ) + ) + mongodb_copy(json_resource, cratedb_url, transformation=transformation) # Verify metadata in target database. assert cratedb.database.table_exists("testdrive.demo") is True