Skip to content

Commit

Permalink
MongoDB: Improve error handling wrt. bulk operations vs. usability
Browse files Browse the repository at this point in the history
In order to have both, efficient bulk insert operations, and on-the-spot
error messages on records that fail to insert, let's introduce a
two-stage approach:

First, try to insert a batch. When it fails, determine invalid records
and insert them one-by-one, in order to relay corresponding error
messages to the user.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent 9e52464 commit fc214a0
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def mongodb_copy(
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")

Check warning on line 172 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171-L172

Added lines #L171 - L172 were not covered by tests
outcome_task = False
outcome = outcome and outcome_task

Expand Down
74 changes: 52 additions & 22 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from pymongo.cursor import Cursor
from sqlalchemy.exc import ProgrammingError

Check warning on line 10 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L10

Added line #L10 was not covered by tests
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress
Expand All @@ -16,7 +17,9 @@
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany

Check warning on line 20 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L20

Added line #L20 was not covered by tests
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.database import BulkResponse

Check warning on line 22 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L22

Added line #L22 was not covered by tests

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,9 +47,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if isinstance(data, Cursor):
data = list(data)
if not isinstance(data, list):
if not isinstance(data, Cursor) and not isinstance(data, list):

Check warning on line 50 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L50

Added line #L50 was not covered by tests
data = [data]

# Define SQL INSERT statement.
Expand All @@ -72,10 +73,12 @@ def __init__(
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "raise",
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

Check warning on line 80 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L80

Added line #L80 was not covered by tests

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down Expand Up @@ -124,10 +127,14 @@ def start(self):
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out: int = 0

count_success_total: int = 0
count_error_total: int = 0

Check warning on line 132 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L131-L132

Added lines #L131 - L132 were not covered by tests

# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for documents in self.mongodb_adapter.query():
current_batch_size = len(documents)

Check warning on line 136 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L136

Added line #L136 was not covered by tests

progress_bar.set_description("ACQUIRE")

try:
Expand All @@ -138,27 +145,50 @@ def start(self):
raise
continue

# Submit operation to CrateDB.
statement = sa.text(operation.statement)

Check warning on line 148 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L148

Added line #L148 was not covered by tests

# Submit operation to CrateDB, using `bulk_args`.
progress_bar.set_description("SUBMIT ")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise IOError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
cursor = connection.execute(statement=statement, parameters=operation.parameters)
connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
count_success_total += bulk_response.success_count
progress_bar.update(n=bulk_response.success_count)

Check warning on line 160 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L153-L160

Added lines #L153 - L160 were not covered by tests

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
except ProgrammingError:
failed_records = [operation.parameters]
count_success_local = 0

Check warning on line 166 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L164-L166

Added lines #L164 - L166 were not covered by tests

# When bulk operations fail, try inserting failed records record-by-record,
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(

Check warning on line 171 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L170-L171

Added lines #L170 - L171 were not covered by tests
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
if self.on_error == "raise":
raise
continue
for record in failed_records:
try:
connection.execute(statement=statement, parameters=record)
connection.commit()
count_success_total += 1
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
count_error_total += 1
if self.on_error == "raise":
raise
progress_bar.update(n=1)

Check warning on line 186 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L175-L186

Added lines #L175 - L186 were not covered by tests

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

logger.info(f"Number of records written: success={count_success_total}, error={count_error_total}")
if count_success_total == 0:

Check warning on line 191 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L190-L191

Added lines #L190 - L191 were not covered by tests
logger.warning("No data has been copied")

return True
Empty file.
19 changes: 19 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from sqlalchemy_cratedb import dialect

Check warning on line 1 in cratedb_toolkit/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/sqlalchemy/patch.py#L1

Added line #L1 was not covered by tests


def do_executemany(self, cursor, statement, parameters, context=None):

Check warning on line 4 in cratedb_toolkit/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/sqlalchemy/patch.py#L4

Added line #L4 was not covered by tests
"""
Improved version of `do_executemany` that stores its response into the request context instance.
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`.
"""
result = cursor.executemany(statement, parameters)
if context is not None:
context.last_executemany_result = result

Check warning on line 12 in cratedb_toolkit/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/sqlalchemy/patch.py#L10-L12

Added lines #L10 - L12 were not covered by tests


def monkeypatch_executemany():

Check warning on line 15 in cratedb_toolkit/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/sqlalchemy/patch.py#L15

Added line #L15 was not covered by tests
"""
Enable improved version of `do_executemany`.
"""
dialect.do_executemany = do_executemany

Check warning on line 19 in cratedb_toolkit/sqlalchemy/patch.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/sqlalchemy/patch.py#L19

Added line #L19 was not covered by tests
65 changes: 65 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import logging
import os
import typing as t
from functools import cached_property

Check warning on line 7 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L7

Added line #L7 was not covered by tests
from pathlib import Path

import sqlalchemy as sa
import sqlparse
from attrs import define

Check warning on line 12 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L12

Added line #L12 was not covered by tests
from boltons.urlutils import URL
from cratedb_sqlparse import sqlparse as sqlparse_cratedb
from sqlalchemy.exc import ProgrammingError
Expand Down Expand Up @@ -428,3 +430,66 @@ def get_table_names(sql: str) -> t.List[t.List[str]]:
local_names.append(table.name)
names.append(local_names)
return names


class BulkResultItem(t.TypedDict):

Check warning on line 435 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L435

Added line #L435 was not covered by tests
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int

Check warning on line 440 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L440

Added line #L440 was not covered by tests


@define
class BulkResponse:

Check warning on line 444 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L443-L444

Added lines #L443 - L444 were not covered by tests
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
TODO: Think about refactoring this to `sqlalchemy_cratedb.support`.
"""

parameters: t.Union[t.List[t.Dict[str, t.Any]], None]
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None]

Check warning on line 455 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L454-L455

Added lines #L454 - L455 were not covered by tests

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:

Check warning on line 458 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L457-L458

Added lines #L457 - L458 were not covered by tests
"""
Compute list of failed records.
CrateDB signals failed insert using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.parameters is None or self.cratedb_bulk_result is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
errors.append(record)
return errors

Check warning on line 472 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L466-L472

Added lines #L466 - L472 were not covered by tests

@cached_property
def parameter_count(self) -> int:

Check warning on line 475 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L474-L475

Added lines #L474 - L475 were not covered by tests
"""
Compute bulk size / length of parameter list.
"""
if not self.parameters:
return 0
return len(self.parameters)

Check warning on line 481 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L479-L481

Added lines #L479 - L481 were not covered by tests

@cached_property
def success_count(self) -> int:

Check warning on line 484 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L483-L484

Added lines #L483 - L484 were not covered by tests
"""
Compute number of succeeding records within a batch.
"""
return self.parameter_count - self.failed_count

Check warning on line 488 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L488

Added line #L488 was not covered by tests

@cached_property
def failed_count(self) -> int:

Check warning on line 491 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L490-L491

Added lines #L490 - L491 were not covered by tests
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)

Check warning on line 495 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L495

Added line #L495 was not covered by tests
22 changes: 21 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
"""
Expand Down Expand Up @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record.
"""

# Define source and target URLs.
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 1

assert "Dynamic nested arrays are not supported" in caplog.text


def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
Expand Down

0 comments on commit fc214a0

Please sign in to comment.