From fc214a03826dc3cf45bcf7a9f2f85e6b265a1d92 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 10:24:58 +0200 Subject: [PATCH] MongoDB: Improve error handling wrt. bulk operations vs. usability In order to have both, efficient bulk insert operations, and on-the-spot error messages on records that fail to insert, let's introduce a two-stage approach: First, try to insert a batch. When it fails, determine invalid records and insert them one-by-one, in order to relay corresponding error messages to the user. --- CHANGES.md | 1 + cratedb_toolkit/io/mongodb/api.py | 4 +- cratedb_toolkit/io/mongodb/copy.py | 74 ++++++++++++++++++-------- cratedb_toolkit/sqlalchemy/__init__.py | 0 cratedb_toolkit/sqlalchemy/patch.py | 19 +++++++ cratedb_toolkit/util/database.py | 65 ++++++++++++++++++++++ tests/io/mongodb/test_copy.py | 22 +++++++- 7 files changed, 160 insertions(+), 25 deletions(-) create mode 100644 cratedb_toolkit/sqlalchemy/__init__.py create mode 100644 cratedb_toolkit/sqlalchemy/patch.py diff --git a/CHANGES.md b/CHANGES.md index e1b61217..f16e011d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ or from filesystem directory - MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` - MongoDB: Optionally filter server collection using MongoDB query expression +- MongoDB: Improve error handling wrt. bulk operations vs. usability ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 83a9f30f..12c05cad 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -168,8 +168,8 @@ def mongodb_copy( for task in tasks: try: outcome_task = task.start() - except (Exception, PanicException): - logger.exception("Task failed") + except (Exception, PanicException) as ex: + logger.error(f"Task failed: {ex}") outcome_task = False outcome = outcome and outcome_task diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index 893a4282..0a7686fb 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -7,6 +7,7 @@ from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB from pymongo.cursor import Cursor +from sqlalchemy.exc import ProgrammingError from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm from zyp.model.collection import CollectionAddress @@ -16,7 +17,9 @@ from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.database import BulkResponse logger = logging.getLogger(__name__) @@ -44,9 +47,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat """ Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ - if isinstance(data, Cursor): - data = list(data) - if not isinstance(data, list): + if not isinstance(data, Cursor) and not isinstance(data, list): data = [data] # Define SQL INSERT statement. @@ -72,10 +73,12 @@ def __init__( mongodb_url: t.Union[str, URL], cratedb_url: t.Union[str, URL], tm: t.Union[TransformationManager, None], - on_error: t.Literal["ignore", "raise"] = "raise", + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): + monkeypatch_executemany() + self.mongodb_uri = URL(mongodb_url) self.cratedb_uri = URL(cratedb_url) @@ -124,10 +127,14 @@ def start(self): records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out: int = 0 + + count_success_total: int = 0 + count_error_total: int = 0 # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. for documents in self.mongodb_adapter.query(): + current_batch_size = len(documents) + progress_bar.set_description("ACQUIRE") try: @@ -138,27 +145,50 @@ def start(self): raise continue - # Submit operation to CrateDB. + statement = sa.text(operation.statement) + + # Submit operation to CrateDB, using `bulk_args`. progress_bar.set_description("SUBMIT ") try: - result = connection.execute(sa.text(operation.statement), operation.parameters) - result_size = result.rowcount - if result_size < 0: - raise IOError("Unable to insert one or more records") - records_out += result_size - progress_bar.update(n=result_size) - except Exception as ex: - logger_on_error( - f"Executing operation failed: {ex}\n" - f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]" + cursor = connection.execute(statement=statement, parameters=operation.parameters) + connection.commit() + cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None) + bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) + failed_records = bulk_response.failed_records + count_success_local = bulk_response.success_count + count_success_total += bulk_response.success_count + progress_bar.update(n=bulk_response.success_count) + + # When a batch is of size one, an exception is raised. + # Just signal the same condition as if a batch would have failed. + except ProgrammingError: + failed_records = [operation.parameters] + count_success_local = 0 + + # When bulk operations fail, try inserting failed records record-by-record, + # in order to relay proper error messages to the user. + if failed_records: + logger.warning( + f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " + f"Falling back to per-record operations." ) - if self.on_error == "raise": - raise - continue + for record in failed_records: + try: + connection.execute(statement=statement, parameters=record) + connection.commit() + count_success_total += 1 + except Exception as ex: + logger.warning(f"Operation failed: {ex}") + logger.debug(f"Failing record: {record}") + count_error_total += 1 + if self.on_error == "raise": + raise + progress_bar.update(n=1) progress_bar.close() - connection.commit() - logger.info(f"Number of records written: {records_out}") - if records_out == 0: + + logger.info(f"Number of records written: success={count_success_total}, error={count_error_total}") + if count_success_total == 0: logger.warning("No data has been copied") + return True diff --git a/cratedb_toolkit/sqlalchemy/__init__.py b/cratedb_toolkit/sqlalchemy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py new file mode 100644 index 00000000..9537f5bc --- /dev/null +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -0,0 +1,19 @@ +from sqlalchemy_cratedb import dialect + + +def do_executemany(self, cursor, statement, parameters, context=None): + """ + Improved version of `do_executemany` that stores its response into the request context instance. + + TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`. + """ + result = cursor.executemany(statement, parameters) + if context is not None: + context.last_executemany_result = result + + +def monkeypatch_executemany(): + """ + Enable improved version of `do_executemany`. + """ + dialect.do_executemany = do_executemany diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index 0d4191d8..88a76536 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -4,10 +4,12 @@ import logging import os import typing as t +from functools import cached_property from pathlib import Path import sqlalchemy as sa import sqlparse +from attrs import define from boltons.urlutils import URL from cratedb_sqlparse import sqlparse as sqlparse_cratedb from sqlalchemy.exc import ProgrammingError @@ -428,3 +430,66 @@ def get_table_names(sql: str) -> t.List[t.List[str]]: local_names.append(table.name) names.append(local_names) return names + + +class BulkResultItem(t.TypedDict): + """ + Define the shape of a CrateDB bulk request response item. + """ + + rowcount: int + + +@define +class BulkResponse: + """ + Manage CrateDB bulk request responses. + Accepts a list of bulk arguments (parameter list) and a list of bulk response items. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + + TODO: Think about refactoring this to `sqlalchemy_cratedb.support`. + """ + + parameters: t.Union[t.List[t.Dict[str, t.Any]], None] + cratedb_bulk_result: t.Union[t.List[BulkResultItem], None] + + @cached_property + def failed_records(self) -> t.List[t.Dict[str, t.Any]]: + """ + Compute list of failed records. + + CrateDB signals failed insert using `rowcount=-2`. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling + """ + if self.parameters is None or self.cratedb_bulk_result is None: + return [] + errors: t.List[t.Dict[str, t.Any]] = [] + for record, status in zip(self.parameters, self.cratedb_bulk_result): + if status["rowcount"] == -2: + errors.append(record) + return errors + + @cached_property + def parameter_count(self) -> int: + """ + Compute bulk size / length of parameter list. + """ + if not self.parameters: + return 0 + return len(self.parameters) + + @cached_property + def success_count(self) -> int: + """ + Compute number of succeeding records within a batch. + """ + return self.parameter_count - self.failed_count + + @cached_property + def failed_count(self) -> int: + """ + Compute number of failed records within a batch. + """ + return len(self.failed_records) diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index 36ac8c1c..446f77df 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb): assert cratedb.database.count_records("testdrive.books-relaxed") == 4 -def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): +def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer. """ @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): assert timestamp_type == "bigint" +def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 1 + + assert "Dynamic nested arrays are not supported" in caplog.text + + def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer.