Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Loader and MongoDB: A few refinements #270

Merged
merged 11 commits into from
Sep 22, 2024
Merged
13 changes: 13 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@


## Unreleased
- Table Loader: Improved conditional handling of "transformation" parameter
- Table Loader: Improved status reporting and error logging in `BulkProcessor`
- MongoDB: Improve error reporting
- MongoDB Full: Polars' `read_ndjson` doesn't load MongoDB JSON data well,
use `fsspec` and `orjson` instead
- MongoDB Full: Improved initialization of transformation subsystem
- MongoDB Adapter: Improved performance of when computing collection cardinality
by using `collection.estimated_document_count()`
- MongoDB Full: Optionally use `limit` parameter as number of total records
- MongoDB Adapter: Evaluate `_id` filter field by upcasting to `bson.ObjectId`,
to convey a filter that makes `ctk load table` process a single document,
identified by its OID
- MongoDB Dependencies: Update to commons-codec 0.0.17

## 2024/09/19 v0.0.24
- MongoDB Full: Refactor transformation subsystem to `commons-codec`
Expand Down
12 changes: 8 additions & 4 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

class ClusterBase(abc.ABC):
@abstractmethod
def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path):
def load_table(
self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None
):
raise NotImplementedError("Child class needs to implement this method")


Expand All @@ -40,7 +42,7 @@
self,
resource: InputOutputResource,
target: t.Optional[TableAddress] = None,
transformation: Path = None,
transformation: t.Union[Path, None] = None,
):
"""
Load data into a database table on CrateDB Cloud.
Expand Down Expand Up @@ -102,7 +104,9 @@
address: DatabaseAddress
info: t.Optional[ClusterInformation] = None

def load_table(self, resource: InputOutputResource, target: TableAddress, transformation: Path = None):
def load_table(
self, resource: InputOutputResource, target: TableAddress, transformation: t.Union[Path, None] = None
):
"""
Load data into a database table on a standalone CrateDB Server.

Expand Down Expand Up @@ -168,4 +172,4 @@
return False

else:
raise NotImplementedError("Importing resource not implemented yet")
raise NotImplementedError(f"Importing resource not implemented yet: {source_url_obj}")

Check warning on line 175 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L175

Added line #L175 was not covered by tests
7 changes: 6 additions & 1 deletion cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import sys
import typing as t
from pathlib import Path

import click
Expand Down Expand Up @@ -49,7 +50,7 @@
table: str,
format_: str,
compression: str,
transformation: Path,
transformation: t.Union[Path, None],
):
"""
Import data into CrateDB and CrateDB Cloud clusters.
Expand All @@ -64,6 +65,10 @@
if not cluster_id and not cratedb_sqlalchemy_url and not cratedb_http_url:
raise KeyError(error_message)

# When `--transformation` is given, but empty, fix it.
if transformation is not None and transformation.name == "":
transformation = None

Check warning on line 70 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L70

Added line #L70 was not covered by tests

# When SQLAlchemy URL is not given, but HTTP URL is, compute the former on demand.
if cluster_id:
address = None
Expand Down
16 changes: 9 additions & 7 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration?
import json
import typing as t
from functools import cached_property

Expand Down Expand Up @@ -120,14 +121,15 @@

def start(self) -> BulkMetrics:
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
batch_count = 0
for batch in self.data:
batch_count += 1
self.progress_bar and self.progress_bar.set_description("READ ")
current_batch_size = len(batch)

self.progress_bar and self.progress_bar.set_description("ACQUIRE")

try:
operation = self.batch_to_operation(batch)
except Exception as ex:
self._metrics.count_error_total += current_batch_size

Check warning on line 132 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L132

Added line #L132 was not covered by tests
self.log_level(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
Expand All @@ -137,7 +139,7 @@
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
self.progress_bar and self.progress_bar.set_description("SUBMIT ")
self.progress_bar and self.progress_bar.set_description("WRITE")
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
Expand All @@ -158,7 +160,7 @@
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Incomplete batch #{batch_count}. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
for record in failed_records:
Expand All @@ -167,8 +169,8 @@
self.connection.commit()
self._metrics.count_success_total += 1
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
logger.error(f"Operation failed: {ex}")
logger.debug(f"Invalid record:\n{json.dumps(record, indent=2)}")
self._metrics.count_error_total += 1
self._metrics.bytes_error_total += asizeof(record)
if self.on_error == "raise":
Expand Down
38 changes: 24 additions & 14 deletions cratedb_toolkit/io/mongodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import logging
import typing as t
from abc import abstractmethod
from copy import deepcopy
from functools import cached_property
from pathlib import Path

import boltons.urlutils
import polars as pl
import bson
import pymongo
import yarl
from attrs import define, field
Expand All @@ -18,6 +19,7 @@

from cratedb_toolkit.io.mongodb.util import batches
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util.io import read_json

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +58,7 @@
return int(self.address.uri.query_params.get("batch-size", 100))

@cached_property
def filter(self) -> t.Union[str, None]:
def filter(self) -> t.Union[t.Dict[str, t.Any], None]:
return json.loads(self.address.uri.query_params.get("filter", "null"))

@cached_property
Expand Down Expand Up @@ -106,13 +108,13 @@
if not self._path.exists():
raise FileNotFoundError(f"Resource not found: {self._path}")
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using filter expressions is not supported by filesystem adapter")

Check warning on line 111 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L111

Added line #L111 was not covered by tests
if self.limit:
raise NotImplementedError("Using limit parameter is not supported by filesystem adapter")

Check warning on line 113 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L113

Added line #L113 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using offset parameter is not supported by filesystem adapter")

Check warning on line 115 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L115

Added line #L115 was not covered by tests
if self._path.suffix in [".json", ".jsonl", ".ndjson"]:
data = pl.read_ndjson(
self._path, batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
data = read_json(str(self._path))
elif ".bson" in str(self._path):
data = IterableData(str(self._path), options={"format_in": "bson"}).iter()
else:
Expand All @@ -137,15 +139,15 @@

def query(self):
if self.filter:
raise NotImplementedError("Using MongoDB filter expressions is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using filter expressions is not supported by remote resource adapter")

Check warning on line 142 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L142

Added line #L142 was not covered by tests
if self.limit:
raise NotImplementedError("Using limit parameter is not supported by remote resource adapter")

Check warning on line 144 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L144

Added line #L144 was not covered by tests
if self.offset:
raise NotImplementedError("Using offsets is not supported by Polars' NDJSON reader")
raise NotImplementedError("Using offset parameter is not supported by remote resource adapter")

Check warning on line 146 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L146

Added line #L146 was not covered by tests
if self._url.path.endswith(".json") or self._url.path.endswith(".jsonl") or self._url.path.endswith(".ndjson"):
data = pl.read_ndjson(
str(self._url), batch_size=self.batch_size, n_rows=self.limit or None, ignore_errors=True
).to_dicts()
data = read_json(str(self._url))
elif self._url.path.endswith(".bson"):
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet. SIC")
raise NotImplementedError("HTTP+BSON loader does not support .bson files yet.")

Check warning on line 150 in cratedb_toolkit/io/mongodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/adapter.py#L150

Added line #L150 was not covered by tests
else:
raise ValueError(f"Unsupported file type: {self._url}")
return batches(data, self.batch_size)
Expand All @@ -170,12 +172,20 @@
return sorted(database.list_collection_names())

def record_count(self, filter_=None) -> int:
"""
# Exact. Takes too long on large collections.
filter_ = filter_ or {}
return self._mongodb_collection.count_documents(filter=filter_)
"""
return self._mongodb_collection.estimated_document_count()

def query(self):
_filter = deepcopy(self.filter)
# Evaluate `_id` filter field specially, by upcasting to `bson.ObjectId`.
if _filter and "_id" in _filter:
_filter["_id"] = bson.ObjectId(_filter["_id"])
data = (
self._mongodb_collection.find(filter=self.filter)
self._mongodb_collection.find(filter=_filter)
.batch_size(self.batch_size)
.skip(self.offset)
.limit(self.limit)
Expand Down
9 changes: 4 additions & 5 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from boltons.urlutils import URL
from polars.exceptions import PanicException
from zyp.model.project import TransformationProject

from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.cdc import MongoDBCDCRelayCrateDB
Expand Down Expand Up @@ -93,7 +94,7 @@
def mongodb_copy(
source_url: t.Union[str, URL],
target_url: t.Union[str, URL],
transformation: t.Union[Path, None] = None,
transformation: t.Union[Path, TransformationManager, TransformationProject, None] = None,
progress: bool = False,
):
"""
Expand All @@ -111,9 +112,7 @@
target_url = URL(target_url)

# Optionally configure transformations.
tm = None
if transformation:
tm = TransformationManager(path=transformation)
tm = TransformationManager.from_any(transformation)

# Check if source address URL includes a table name or not.
has_table = True
Expand Down Expand Up @@ -169,7 +168,7 @@
try:
outcome_task = task.start()
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")
logger.exception(f"Task failed: {ex}")

Check warning on line 171 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171

Added line #L171 was not covered by tests
outcome_task = False
outcome = outcome and outcome_task

Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@
Read items from MongoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}")
records_in = self.mongodb_adapter.record_count()
limit = self.mongodb_adapter.limit
if limit > 0:
records_in = limit

Check warning on line 76 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L76

Added line #L76 was not covered by tests
else:
records_in = self.mongodb_adapter.record_count()
logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}")
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@

rich.print(f"\nExecuting a [red bold]{'partial' if partial else 'full'}[/red bold] scan")

tm = TransformationManager(path=args.transformation)
tm = TransformationManager.from_path(path=args.transformation)

Check warning on line 95 in cratedb_toolkit/io/mongodb/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/core.py#L95

Added line #L95 was not covered by tests
schemas = OrderedDict()
for collection_name in filtered_collections:
collection_schema = extract_schema_from_collection(db[collection_name], partial, limit=args.limit)
Expand Down Expand Up @@ -121,7 +121,7 @@

TODO: Run on multiple collections, like `extract`.
"""
tm = TransformationManager(path=args.transformation)
tm = TransformationManager.from_path(path=args.transformation)

Check warning on line 124 in cratedb_toolkit/io/mongodb/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/core.py#L124

Added line #L124 was not covered by tests
buffer = io.BytesIO()
client, db = get_mongodb_client_database(args, document_class=RawBSONDocument)
collection_to_json(db[args.collection], fp=buffer, tm=tm, limit=args.limit)
Expand Down
34 changes: 25 additions & 9 deletions cratedb_toolkit/io/mongodb/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,33 @@


class TransformationManager:
def __init__(self, path: Path):
self.path = path
self.active = False
if not self.path:
return
if not self.path.exists():
raise FileNotFoundError(f"File does not exist: {self.path}")
self.project = TransformationProject.from_yaml(self.path.read_text())
logger.info("Transformation manager initialized. File: %s", self.path)
def __init__(self, project: TransformationProject):
self.project = project
self.active = True

@classmethod
def from_any(cls, transformation=None):
if transformation is None:
return None
elif isinstance(transformation, TransformationManager):
return transformation

Check warning on line 22 in cratedb_toolkit/io/mongodb/transform.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/transform.py#L22

Added line #L22 was not covered by tests
elif isinstance(transformation, TransformationProject):
return cls(project=transformation)
elif isinstance(transformation, Path):
return cls.from_path(path=transformation)
else:
raise ValueError(f"Unable to initialize transformation manager from {type(transformation)}")

Check warning on line 28 in cratedb_toolkit/io/mongodb/transform.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/transform.py#L28

Added line #L28 was not covered by tests

@classmethod
def from_path(cls, path: Path):
if not path:
return None

Check warning on line 33 in cratedb_toolkit/io/mongodb/transform.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/transform.py#L33

Added line #L33 was not covered by tests
if not path.exists():
raise FileNotFoundError(f"File does not exist: {path}")

Check warning on line 35 in cratedb_toolkit/io/mongodb/transform.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/transform.py#L35

Added line #L35 was not covered by tests
logger.info("Loading Zyp transformation file: %s", path)
project = TransformationProject.from_yaml(path.read_text())
return cls(project=project)

def apply_type_overrides(self, database_name: str, collection_name: str, collection_schema: t.Dict[str, t.Any]):
if not self.active:
return
Expand Down
5 changes: 4 additions & 1 deletion cratedb_toolkit/io/mongodb/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import typing as t

from commons_codec.transform.mongodb import Document
from pymongo.cursor import Cursor

from cratedb_toolkit.io.mongodb.model import DocumentDict, Documents
Expand Down Expand Up @@ -44,7 +45,9 @@ def sanitize_field_names(data: DocumentDict) -> DocumentDict:
return d


def batches(data: t.Union[Cursor, Documents], batch_size: int = 100) -> t.Generator[Documents, None, None]:
def batches(
data: t.Union[Cursor, Documents, t.Generator[Document, None, None]], batch_size: int = 100
) -> t.Generator[Documents, None, None]:
"""
Generate batches of documents.
"""
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/util/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def boot_click(ctx: click.Context, verbose: bool = False, debug: bool = False):
log_level = logging.DEBUG

# Setup logging, according to `verbose` / `debug` flags.
setup_logging(level=log_level, verbose=verbose)
setup_logging(level=log_level, verbose=verbose, debug=debug)


def split_list(value: str, delimiter: str = ",") -> t.List[str]:
Expand Down
Loading