From d8f993e208432b589007e1639dcd04b619bc4187 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 11:47:39 +0200 Subject: [PATCH 01/11] Table Loader: Improve conditional handling of "transformation" parameter --- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 12 ++++++++---- cratedb_toolkit/io/cli.py | 7 ++++++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 67faa63..638e0f8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- Table Loader: Improved conditional handling of "transformation" parameter ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index b406074..078ac02 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -19,7 +19,9 @@ class ClusterBase(abc.ABC): @abstractmethod - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path): + def load_table( + self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None + ): raise NotImplementedError("Child class needs to implement this method") @@ -40,7 +42,7 @@ def load_table( self, resource: InputOutputResource, target: t.Optional[TableAddress] = None, - transformation: Path = None, + transformation: t.Union[Path, None] = None, ): """ Load data into a database table on CrateDB Cloud. @@ -102,7 +104,9 @@ class StandaloneCluster(ClusterBase): address: DatabaseAddress info: t.Optional[ClusterInformation] = None - def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None): + def load_table( + self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None + ): """ Load data into a database table on a standalone CrateDB Server. @@ -168,4 +172,4 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf return False else: - raise NotImplementedError("Importing resource not implemented yet") + raise NotImplementedError(f"Importing resource not implemented yet: {source_url_obj}") diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index cd03961..8ebb772 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -1,5 +1,6 @@ import logging import sys +import typing as t from pathlib import Path import click @@ -49,7 +50,7 @@ def load_table( table: str, format_: str, compression: str, - transformation: Path, + transformation: t.Union[Path, None], ): """ Import data into CrateDB and CrateDB Cloud clusters. @@ -64,6 +65,10 @@ def load_table( if not cluster_id and not cratedb_sqlalchemy_url and not cratedb_http_url: raise KeyError(error_message) + # When `--transformation` is given, but empty, fix it. + if transformation is not None and transformation.name == "": + transformation = None + # When SQLAlchemy URL is not given, but HTTP URL is, compute the former on demand. if cluster_id: address = None From 6850bd26c7bfc28ccb24f934891081ba21e0c3f0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 11:50:10 +0200 Subject: [PATCH 02/11] Table Loader: Improve status reporting + error logging in BulkProcessor --- CHANGES.md | 1 + cratedb_toolkit/io/core.py | 16 +++++++++------- cratedb_toolkit/util/cli.py | 2 +- cratedb_toolkit/util/common.py | 24 +++++++++++++++++++----- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 638e0f8..f56cbe4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Table Loader: Improved conditional handling of "transformation" parameter +- Table Loader: Improved status reporting and error logging in `BulkProcessor` ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/core.py b/cratedb_toolkit/io/core.py index d793b89..157e1e6 100644 --- a/cratedb_toolkit/io/core.py +++ b/cratedb_toolkit/io/core.py @@ -1,4 +1,5 @@ # TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration? +import json import typing as t from functools import cached_property @@ -120,14 +121,15 @@ def log_level(self): def start(self) -> BulkMetrics: # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + batch_count = 0 for batch in self.data: + batch_count += 1 + self.progress_bar and self.progress_bar.set_description("READ ") current_batch_size = len(batch) - - self.progress_bar and self.progress_bar.set_description("ACQUIRE") - try: operation = self.batch_to_operation(batch) except Exception as ex: + self._metrics.count_error_total += current_batch_size self.log_level(f"Computing query failed: {ex}") if self.on_error == "raise": raise @@ -137,7 +139,7 @@ def start(self) -> BulkMetrics: statement = sa.text(operation.statement) # Submit operation to CrateDB, using `bulk_args`. - self.progress_bar and self.progress_bar.set_description("SUBMIT ") + self.progress_bar and self.progress_bar.set_description("WRITE") try: cursor = self.connection.execute(statement=statement, parameters=operation.parameters) self.connection.commit() @@ -158,7 +160,7 @@ def start(self) -> BulkMetrics: # in order to relay proper error messages to the user. if failed_records: logger.warning( - f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " + f"Incomplete batch #{batch_count}. Records processed: {count_success_local}/{current_batch_size}. " f"Falling back to per-record operations." ) for record in failed_records: @@ -167,8 +169,8 @@ def start(self) -> BulkMetrics: self.connection.commit() self._metrics.count_success_total += 1 except Exception as ex: - logger.warning(f"Operation failed: {ex}") - logger.debug(f"Failing record: {record}") + logger.error(f"Operation failed: {ex}") + logger.debug(f"Invalid record:\n{json.dumps(record, indent=2)}") self._metrics.count_error_total += 1 self._metrics.bytes_error_total += asizeof(record) if self.on_error == "raise": diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index f5ebc3e..17d27b0 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -24,7 +24,7 @@ def boot_click(ctx: click.Context, verbose: bool = False, debug: bool = False): log_level = logging.DEBUG # Setup logging, according to `verbose` / `debug` flags. - setup_logging(level=log_level, verbose=verbose) + setup_logging(level=log_level, verbose=verbose, debug=debug) def split_list(value: str, delimiter: str = ",") -> t.List[str]: diff --git a/cratedb_toolkit/util/common.py b/cratedb_toolkit/util/common.py index c5aaa6e..9ab9cd1 100644 --- a/cratedb_toolkit/util/common.py +++ b/cratedb_toolkit/util/common.py @@ -1,12 +1,15 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging +import os import colorlog from colorlog.escape_codes import escape_codes +from cratedb_toolkit.util.data import asbool -def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): + +def setup_logging(level=logging.INFO, verbose: bool = False, debug: bool = False, width: int = 36): reset = escape_codes["reset"] log_format = f"%(asctime)-15s [%(name)-{width}s] %(log_color)s%(levelname)-8s:{reset} %(message)s" @@ -15,14 +18,25 @@ def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): logging.basicConfig(format=log_format, level=level, handlers=[handler]) - # Enable SQLAlchemy logging. - if verbose: - logging.getLogger("sqlalchemy").setLevel(level) - logging.getLogger("crate.client").setLevel(level) logging.getLogger("sqlalchemy_cratedb").setLevel(level) logging.getLogger("urllib3.connectionpool").setLevel(level) + # Enable SQLAlchemy logging. + if verbose: + logging.getLogger("cratedb_toolkit").setLevel(logging.DEBUG) + + if debug: + # Optionally tame SQLAlchemy and PyMongo. + if asbool(os.environ.get("DEBUG_SQLALCHEMY")): + logging.getLogger("sqlalchemy").setLevel(level) + else: + logging.getLogger("sqlalchemy").setLevel(logging.INFO) + if asbool(os.environ.get("DEBUG_PYMONGO")): + logging.getLogger("pymongo").setLevel(level) + else: + logging.getLogger("pymongo").setLevel(logging.INFO) + # logging.getLogger("docker.auth").setLevel(logging.INFO) # noqa: ERA001 # Tame Faker spamming the logs. From 8426f90fc8e3ab5e3aa38e087cf0ab48063299a7 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 12:01:46 +0200 Subject: [PATCH 03/11] Chore: Update backlog --- doc/backlog.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/doc/backlog.md b/doc/backlog.md index 1a25a5d..026fb76 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -1,5 +1,25 @@ # Backlog +## Iteration +1 +- Table Loader: Refactor URL dispatcher, use fsspec +- Table Loader/Docs: Advertise using OCI image +- MongoDB: Load table with querying by single object id +- MongoDB: Multi-phase BulkProcessor batch size adjustments +- MongoDB: Report byte sizes (cur/avg/total) in progress bar +- Documentation: + > The procedure employed by CTK uses the catch-all `data OBJECT(DYNAMIC)` + > storage strategy, which is sinking the source record/document into a + > single column in CrateDB. + > + > The transformation recipe attempts to outline a few features provided by + > Zyp Transformations, in this case exclusively applying transformations + > described by expressions written in jqlang. +- MongoDB/Docs: Describe usage of `mongoimport` and `mongoexport`. + ```shell + mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' + ``` +- MongoDB: Convert dates like `"date": "Sep 18 2015"`, see `testdrive.city_inspections`. + ## Iteration +2 - Address `fix_job_info_table_name` - Add more items about `ctk load table` to `examples/` folder From 52f7c08e3417614b6863952a094db0ef554a0591 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 12:02:25 +0200 Subject: [PATCH 04/11] MongoDB Full: Improve documentation --- doc/io/mongodb/loader.md | 41 +++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index 00324b2..db2c01b 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -48,6 +48,12 @@ Transfer all collections in database from MongoDB Atlas. export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/ticker ctk load table "mongodb+srv://john:EeY6OocooL8rungu@testdrive.ahnaik1.mongodb.net/ticker?batch-size=5000" ``` +:::{important} +When transferring **multiple collections**, make sure to use a CrateDB database +address which DOES NOT reference an individual table. +It MUST stop at the **schema** label, here, `ticker`. Likewise, the MongoDB +database address also MUST reference a **database**, NOT a specific collection. +::: ### MongoDB Community and Enterprise Transfer data from MongoDB database/collection into CrateDB schema/table. @@ -73,12 +79,21 @@ ctk load table "file+bson:///path/to/mongodb-json-files/datasets/books.json" # Extended JSON, HTTP resource. ctk load table "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/books.json" -# Extended JSON, filesystem, multiple files. -ctk load table "file+bson:///path/to/mongodb-json-files/datasets/*.json" - # BSON, filesystem, relative path, compressed. ctk load table "file+bson:./var/data/testdrive/books.bson.gz" + +# Extended JSON, filesystem, multiple files. +ctk load table \ + "file+bson:///path/to/mongodb-json-files/datasets/*.json?batch-size=2500" \ + --cratedb-sqlalchemy-url="crate://crate@localhost:4200/datasets" ``` +:::{important} +When transferring **multiple collections**, make sure to use a CrateDB database +address which DOES NOT reference an individual table. +It MUST stop at the **schema** label, here, `datasets`. Likewise, the path to +the MongoDB JSON files also MUST reference the **parent folder**, NOT a specific +JSON or BSON file. +::: To exercise a full example importing multiple MongoDB Extended JSON files, see [](#file-import-tutorial). @@ -208,15 +223,19 @@ Alternatively, have a look at the canonical MongoDB C driver's [libbson test files]. ::: +## Troubleshooting +When importing from a BSON file, and observing a traceback like this, +```python +Traceback (most recent call last): + File "/path/to/site-packages/bson/__init__.py", line 1356, in decode_file_iter + yield _bson_to_dict(elements, opts) # type:ignore[type-var, arg-type, misc] + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +bson.errors.InvalidBSON: not enough data for a BSON document +``` +please try to import the file into a MongoDB Server using `mongorestore`, +and export it again using `mongodump` or `mongoexport`, preferably using +recent versions like MongoDB 7 and tools version 100.9.5 or higher. -### Backlog -:::{todo} -- Describe usage of `mongoimport` and `mongoexport`. - ```shell - mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' - ``` -- Convert dates like `"date": "Sep 18 2015"`, see `testdrive.city_inspections`. -::: [BSON]: https://en.wikipedia.org/wiki/BSON From 3d70f1afad852483efc2835435fffad180f70a4e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 12:03:28 +0200 Subject: [PATCH 05/11] MongoDB: Improve error reporting --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/api.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index f56cbe4..8265cbf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ ## Unreleased - Table Loader: Improved conditional handling of "transformation" parameter - Table Loader: Improved status reporting and error logging in `BulkProcessor` +- MongoDB: Improve error reporting ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 12c05ca..37c7cd2 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -169,7 +169,7 @@ def mongodb_copy( try: outcome_task = task.start() except (Exception, PanicException) as ex: - logger.error(f"Task failed: {ex}") + logger.exception(f"Task failed: {ex}") outcome_task = False outcome = outcome and outcome_task From 8a576905286c05518a6cfaf008e8528ea6cabb58 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:04:54 +0200 Subject: [PATCH 06/11] MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well Use `fsspec` and `orjson` instead. --- CHANGES.md | 2 ++ cratedb_toolkit/io/mongodb/adapter.py | 25 +++++++++++++------------ cratedb_toolkit/io/mongodb/util.py | 5 ++++- cratedb_toolkit/util/io.py | 15 +++++++++++++++ pyproject.toml | 3 +++ 5 files changed, 37 insertions(+), 13 deletions(-) create mode 100644 cratedb_toolkit/util/io.py diff --git a/CHANGES.md b/CHANGES.md index 8265cbf..134cac1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,8 @@ - Table Loader: Improved conditional handling of "transformation" parameter - Table Loader: Improved status reporting and error logging in `BulkProcessor` - MongoDB: Improve error reporting +- MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well, + use `fsspec` and `orjson` instead ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index 9163321..8a4629d 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -8,7 +8,7 @@ from pathlib import Path import boltons.urlutils -import polars as pl +import bson import pymongo import yarl from attrs import define, field @@ -18,6 +18,7 @@ from cratedb_toolkit.io.mongodb.util import batches from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util.io import read_json logger = logging.getLogger(__name__) @@ -106,13 +107,13 @@ def query(self): if not self._path.exists(): raise FileNotFoundError(f"Resource not found: {self._path}") if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by filesystem adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by filesystem adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by filesystem adapter") if self._path.suffix in [".json", ".jsonl", ".ndjson"]: - data = pl.read_ndjson( - self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._path)) elif ".bson" in str(self._path): data = IterableData(str(self._path), options={"format_in": "bson"}).iter() else: @@ -137,15 +138,15 @@ def record_count(self, filter_=None) -> int: def query(self): if self.filter: - raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using filter expressions is not supported by remote resource adapter") + if self.limit: + raise NotImplementedError("Using limit parameter is not supported by remote resource adapter") if self.offset: - raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader") + raise NotImplementedError("Using offset parameter is not supported by remote resource adapter") if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"): - data = pl.read_ndjson( - str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True - ).to_dicts() + data = read_json(str(self._url)) elif self._url.path.endswith(".bson"): - raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC") + raise NotImplementedError("HTTP+BSON loader does not support .bson files yet.") else: raise ValueError(f"Unsupported file type: {self._url}") return batches(data, self.batch_size) diff --git a/cratedb_toolkit/io/mongodb/util.py b/cratedb_toolkit/io/mongodb/util.py index 0aed69a..0df03b5 100644 --- a/cratedb_toolkit/io/mongodb/util.py +++ b/cratedb_toolkit/io/mongodb/util.py @@ -1,6 +1,7 @@ import re import typing as t +from commons_codec.transform.mongodb import Document from pymongo.cursor import Cursor from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents @@ -44,7 +45,9 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict: return d -def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]: +def batches( + data: t.Union[Cursor, Documents, t.Generator[Document, None, None]], batch_size: int = 100 +) -> t.Generator[Documents, None, None]: """ Generate batches of documents. """ diff --git a/cratedb_toolkit/util/io.py b/cratedb_toolkit/util/io.py new file mode 100644 index 0000000..f52fc43 --- /dev/null +++ b/cratedb_toolkit/util/io.py @@ -0,0 +1,15 @@ +import orjsonl +from fsspec import filesystem +from upath import UPath + + +def read_json(url: str): + """ + Read JSON file from anywhere. + TODO: Does the resource need to be closed? How? + """ + p = UPath(url) + fs = filesystem(p.protocol, **p.storage_options) # equivalent to p.fs + fp = fs.open(p.path) + data = orjsonl.stream(fp) + return data diff --git a/pyproject.toml b/pyproject.toml index af3c1ff..1c90caa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,7 @@ dependencies = [ "cratedb-sqlparse==0.0.6", 'importlib-metadata; python_version < "3.8"', 'importlib-resources; python_version < "3.9"', + "orjsonl<2", "polars<1.7", "pympler<1.2", "python-dateutil<3", @@ -154,8 +155,10 @@ influxdb = [ io = [ "cr8", "dask[dataframe]>=2020", + "fsspec[s3,http]", "pandas<3,>=1", "sqlalchemy>=2", + "universal-pathlib<0.3", ] kinesis = [ "aiobotocore<2.16", From 8d9f14f4564515c4e787fd30d8a085695ea04ceb Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:08:28 +0200 Subject: [PATCH 07/11] MongoDB Full: Improve initialization of transformation subsystem --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/api.py | 7 +++-- cratedb_toolkit/io/mongodb/core.py | 4 +-- cratedb_toolkit/io/mongodb/transform.py | 34 ++++++++++++++++++------- tests/io/mongodb/test_copy.py | 11 +++++++- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 134cac1..6964ee6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - MongoDB: Improve error reporting - MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well, use `fsspec` and `orjson` instead +- MongoDB Full: Improved initialization of transformation subsystem ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 37c7cd2..1b946f3 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -5,6 +5,7 @@ from boltons.urlutils import URL from polars.exceptions import PanicException +from zyp.model.project import TransformationProject from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB @@ -93,7 +94,7 @@ def mongodb_copy_migr8(source_url, target_url, transformation: Path = None, limi def mongodb_copy( source_url: t.Union[str, URL], target_url: t.Union[str, URL], - transformation: t.Union[Path, None] = None, + transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None, progress: bool = False, ): """ @@ -111,9 +112,7 @@ def mongodb_copy( target_url = URL(target_url) # Optionally configure transformations. - tm = None - if transformation: - tm = TransformationManager(path=transformation) + tm = TransformationManager.from_any(transformation) # Check if source address URL includes a table name or not. has_table = True diff --git a/cratedb_toolkit/io/mongodb/core.py b/cratedb_toolkit/io/mongodb/core.py index 5a34207..2d63605 100644 --- a/cratedb_toolkit/io/mongodb/core.py +++ b/cratedb_toolkit/io/mongodb/core.py @@ -92,7 +92,7 @@ def extract(args) -> t.Dict[str, t.Any]: rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan") - tm = TransformationManager(path=args.transformation) + tm = TransformationManager.from_path(path=args.transformation) schemas = OrderedDict() for collection_name in filtered_collections: collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit) @@ -121,7 +121,7 @@ def export(args) -> t.IO[bytes]: TODO: Run on multiple collections, like `extract`. """ - tm = TransformationManager(path=args.transformation) + tm = TransformationManager.from_path(path=args.transformation) buffer = io.BytesIO() client, db = get_mongodb_client_database(args, document_class=RawBSONDocument) collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit) diff --git a/cratedb_toolkit/io/mongodb/transform.py b/cratedb_toolkit/io/mongodb/transform.py index 13e27d5..f71dafc 100644 --- a/cratedb_toolkit/io/mongodb/transform.py +++ b/cratedb_toolkit/io/mongodb/transform.py @@ -10,17 +10,33 @@ class TransformationManager: - def __init__(self, path: Path): - self.path = path - self.active = False - if not self.path: - return - if not self.path.exists(): - raise FileNotFoundError(f"File does not exist: {self.path}") - self.project = TransformationProject.from_yaml(self.path.read_text()) - logger.info("Transformation manager initialized. File: %s", self.path) + def __init__(self, project: TransformationProject): + self.project = project self.active = True + @classmethod + def from_any(cls, transformation=None): + if transformation is None: + return None + elif isinstance(transformation, TransformationManager): + return transformation + elif isinstance(transformation, TransformationProject): + return cls(project=transformation) + elif isinstance(transformation, Path): + return cls.from_path(path=transformation) + else: + raise ValueError(f"Unable to initialize transformation manager from {type(transformation)}") + + @classmethod + def from_path(cls, path: Path): + if not path: + return None + if not path.exists(): + raise FileNotFoundError(f"File does not exist: {path}") + logger.info("Loading Zyp transformation file: %s", path) + project = TransformationProject.from_yaml(path.read_text()) + return cls(project=project) + def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]): if not self.active: return diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index a4350a2..b0e5487 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -5,6 +5,9 @@ import pymongo import pytest +from zyp import CollectionTransformation, MokshaTransformation +from zyp.model.collection import CollectionAddress +from zyp.model.project import TransformationProject from cratedb_toolkit.io.mongodb.api import mongodb_copy from tests.conftest import check_sqlalchemy2 @@ -283,7 +286,13 @@ def test_mongodb_copy_http_json_relaxed(caplog, cratedb): cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Run transfer command. - mongodb_copy(json_resource, cratedb_url) + transformation = TransformationProject().add( + CollectionTransformation( + address=CollectionAddress(container="datasets", name="books"), + pre=MokshaTransformation().jq(".[] |= (._id |= tostring)"), + ) + ) + mongodb_copy(json_resource, cratedb_url, transformation=transformation) # Verify metadata in target database. assert cratedb.database.table_exists("testdrive.demo") is True From 09e5120d3d596a22f0cf4eb22b26a13db99d0ec0 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:10:37 +0200 Subject: [PATCH 08/11] MongoDB Adapter: Improve performance of collection cardinality gathering ... by using `collection.estimated_document_count()`. --- CHANGES.md | 2 ++ cratedb_toolkit/io/mongodb/adapter.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 6964ee6..f598793 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well, use `fsspec` and `orjson` instead - MongoDB Full: Improved initialization of transformation subsystem +- MongoDB Adapter: Improved performance of when computing collection cardinality + by using `collection.estimated_document_count()` ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index 8a4629d..bc041b1 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -171,8 +171,12 @@ def get_collections(self) -> t.List[str]: return sorted(database.list_collection_names()) def record_count(self, filter_=None) -> int: + """ + # Exact. Takes too long on large collections. filter_ = filter_ or {} return self._mongodb_collection.count_documents(filter=filter_) + """ + return self._mongodb_collection.estimated_document_count() def query(self): data = ( From ddddfb0b6642d1c7d8abc886e327018723584e8d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:13:43 +0200 Subject: [PATCH 09/11] MongoDB Full: Use `limit` parameter as number of total records ... when given. --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/copy.py | 6 +++++- doc/backlog.md | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index f598793..7e7d41c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ - MongoDB Full: Improved initialization of transformation subsystem - MongoDB Adapter: Improved performance of when computing collection cardinality by using `collection.estimated_document_count()` +- MongoDB Full: Optionally use `limit` parameter as number of total records ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index a88ff00..febfbe4 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -71,7 +71,11 @@ def start(self): Read items from MongoDB table, convert to SQL INSERT statements, and submit to CrateDB. """ logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}") - records_in = self.mongodb_adapter.record_count() + limit = self.mongodb_adapter.limit + if limit > 0: + records_in = limit + else: + records_in = self.mongodb_adapter.record_count() logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}") with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): diff --git a/doc/backlog.md b/doc/backlog.md index 026fb76..0a34d86 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -19,6 +19,7 @@ mongoimport --uri 'mongodb+srv://MYUSERNAME:SECRETPASSWORD@mycluster-ABCDE.azure.mongodb.net/test?retryWrites=true&w=majority' ``` - MongoDB: Convert dates like `"date": "Sep 18 2015"`, see `testdrive.city_inspections`. +- Table Loader: Propagate offset/limit to progress bar ## Iteration +2 - Address `fix_job_info_table_name` From 4084f4eba80330d81ae4c5e27aa0f4ce015ac27a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:50:04 +0200 Subject: [PATCH 10/11] MongoDB Adapter: Evaluate filtering by `_id` by casting to bson.ObjectId Native objects can't be passed using JSON, so this is a minor surrogate representation to convey a filter that makes `ctk load table` process a single document, identified by its OID. --- CHANGES.md | 3 ++ cratedb_toolkit/io/mongodb/adapter.py | 9 +++-- doc/io/mongodb/loader.md | 5 +-- tests/io/mongodb/test_copy.py | 51 ++++++++++++++++++++++++++- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7e7d41c..70ec586 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,9 @@ - MongoDB Adapter: Improved performance of when computing collection cardinality by using `collection.estimated_document_count()` - MongoDB Full: Optionally use `limit` parameter as number of total records +- MongoDB Adapter: Evaluate `_id` filter field by upcasting to `bson.ObjectId`, + to convey a filter that makes `ctk load table` process a single document, + identified by its OID ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/mongodb/adapter.py b/cratedb_toolkit/io/mongodb/adapter.py index bc041b1..b9deb5c 100644 --- a/cratedb_toolkit/io/mongodb/adapter.py +++ b/cratedb_toolkit/io/mongodb/adapter.py @@ -4,6 +4,7 @@ import logging import typing as t from abc import abstractmethod +from copy import deepcopy from functools import cached_property from pathlib import Path @@ -57,7 +58,7 @@ def batch_size(self) -> int: return int(self.address.uri.query_params.get("batch-size", 100)) @cached_property - def filter(self) -> t.Union[str, None]: + def filter(self) -> t.Union[t.Dict[str, t.Any], None]: return json.loads(self.address.uri.query_params.get("filter", "null")) @cached_property @@ -179,8 +180,12 @@ def record_count(self, filter_=None) -> int: return self._mongodb_collection.estimated_document_count() def query(self): + _filter = deepcopy(self.filter) + # Evaluate `_id` filter field specially, by upcasting to `bson.ObjectId`. + if _filter and "_id" in _filter: + _filter["_id"] = bson.ObjectId(_filter["_id"]) data = ( - self._mongodb_collection.find(filter=self.filter) + self._mongodb_collection.find(filter=_filter) .batch_size(self.batch_size) .skip(self.offset) .limit(self.limit) diff --git a/doc/io/mongodb/loader.md b/doc/io/mongodb/loader.md index db2c01b..ad638bf 100644 --- a/doc/io/mongodb/loader.md +++ b/doc/io/mongodb/loader.md @@ -109,8 +109,9 @@ appending the HTTP URL query parameter `batch-size` to the source URL, like ### Filter Use the HTTP URL query parameter `filter` on the source URL, like -`&filter={"exchange": {"$eq": "NASDAQ"}}`, in order to provide a MongoDB -query filter as a JSON string. +`&filter={"exchange":{"$eq":"NASDAQ"}}`, or +`&filter={"_id":"66f0002e98c00fb8261d87c8"}`, +in order to provide a MongoDB query filter as a JSON string. It works in the same way like `mongoexport`'s `--query` option. On more complex query expressions, make sure to properly encode the right value using URL/Percent Encoding. diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index b0e5487..7c9a11d 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -3,6 +3,7 @@ from pathlib import Path from unittest import mock +import bson import pymongo import pytest from zyp import CollectionTransformation, MokshaTransformation @@ -64,7 +65,7 @@ def test_mongodb_copy_server_database(caplog, cratedb, mongodb): assert results[0]["data"] == data_out -def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): +def test_mongodb_copy_server_collection_with_filter_timestamp(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering. """ @@ -102,6 +103,54 @@ def test_mongodb_copy_server_collection_with_filter(caplog, cratedb, mongodb): assert results[0]["data"] == data_out[1] +def test_mongodb_copy_server_collection_with_filter_oid(caplog, cratedb, mongodb): + """ + Verify MongoDB -> CrateDB data transfer for a specific collection, with filtering by oid. + """ + + # Define source and target URLs. + filter_expression = json.dumps({"_id": "66f0002e98c00fb8261d87c8"}) + mongodb_url = f"{mongodb.get_connection_url()}/testdrive/demo?filter={filter_expression}" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define data. + data_in = [ + { + "_id": bson.ObjectId("66efff8de45f4f7b695134a6"), + "device": "Räuber", + "temperature": 42.42, + "timestamp": 1563051934000, + }, + { + "_id": bson.ObjectId("66f0002e98c00fb8261d87c8"), + "device": "Hotzenplotz", + "temperature": 84.84, + "timestamp": 1563051934100, + }, + ] + data_out = deepcopy(data_in) + data_out[0].update({"_id": mock.ANY}) + data_out[1].update({"_id": mock.ANY}) + + # Populate source database. + client: pymongo.MongoClient = mongodb.get_connection_client() + testdrive = client.get_database("testdrive") + demo = testdrive.create_collection("demo") + demo.insert_many(data_in) + + # Run transfer command. + mongodb_copy( + mongodb_url, + cratedb_url, + ) + + # Verify data in target database. + cratedb.database.refresh_table("testdrive.demo") + assert cratedb.database.count_records("testdrive.demo") == 1 + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) + assert results[0]["data"] == data_out[1] + + def test_mongodb_copy_filesystem_folder_absolute(caplog, cratedb, mongodb): """ Verify MongoDB -> CrateDB data transfer for all files in a folder, with relative addressing. From d89087fe6ecb58cc4f5dbce21fbe918aa271af13 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 22 Sep 2024 13:50:34 +0200 Subject: [PATCH 11/11] MongoDB Dependencies: Update to commons-codec 0.0.17 --- CHANGES.md | 1 + pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 70ec586..17acc5a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,7 @@ - MongoDB Adapter: Evaluate `_id` filter field by upcasting to `bson.ObjectId`, to convey a filter that makes `ctk load table` process a single document, identified by its OID +- MongoDB Dependencies: Update to commons-codec 0.0.17 ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/pyproject.toml b/pyproject.toml index 1c90caa..58705ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,7 +167,7 @@ kinesis = [ "lorrystream[carabas]>=0.0.6", ] mongodb = [ - "commons-codec[mongodb,zyp]>=0.0.16", + "commons-codec[mongodb,zyp]>=0.0.17", "cratedb-toolkit[io]", "orjson<4,>=3.3.1", "pymongo<5,>=3.10.1",