Skip to content

Commit

Permalink
MongoDB: Improve error handling wrt. bulk operations vs. usability
Browse files Browse the repository at this point in the history
In order to have both, efficient bulk insert operations, and on-the-spot
error messages on records that fail to insert, let's introduce a
two-stage approach:

First, try to insert a batch. When it fails, determine invalid records
and insert them one-by-one, in order to relay corresponding error
messages to the user.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent 94b52a8 commit 52c1bca
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def mongodb_copy(
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")

Check warning on line 172 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171-L172

Added lines #L171 - L172 were not covered by tests
outcome_task = False
outcome = outcome and outcome_task

Expand Down
67 changes: 24 additions & 43 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.database import BulkProcessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,9 +46,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if isinstance(data, Cursor):
data = list(data)
if not isinstance(data, list):
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
Expand All @@ -72,10 +72,12 @@ def __init__(
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "raise",
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down Expand Up @@ -114,51 +116,30 @@ def start(self):
logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}")
records_in = self.mongodb_adapter.record_count()
logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out: int = 0

# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for documents in self.mongodb_adapter.query():
progress_bar.set_description("ACQUIRE")

try:
operation = self.translator.to_sql(documents)
except Exception as ex:
logger_on_error(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

# Submit operation to CrateDB.
progress_bar.set_description("SUBMIT ")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise IOError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
)
if self.on_error == "raise":
raise
continue

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

processor = BulkProcessor(
connection=connection,
data=self.mongodb_adapter.query(),
batch_to_operation=self.translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
)
metrics = processor.start()
logger.info(f"Bulk processor metrics: {metrics}")

logger.info(
"Number of records written: "
f"success={metrics.count_success_total}, error={metrics.count_error_total}"
)
if metrics.count_success_total == 0:
logger.warning("No data has been copied")

return True
Empty file.
19 changes: 19 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from sqlalchemy_cratedb import dialect


def do_executemany(self, cursor, statement, parameters, context=None):
"""
Improved version of `do_executemany` that stores its response into the request context instance.
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`.
"""
result = cursor.executemany(statement, parameters)
if context is not None:
context.last_executemany_result = result


def monkeypatch_executemany():
"""
Enable improved version of `do_executemany`.
"""
dialect.do_executemany = do_executemany
157 changes: 157 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
import logging
import os
import typing as t
from functools import cached_property
from pathlib import Path

import sqlalchemy as sa
import sqlparse
from attr import Factory
from attrs import define
from boltons.urlutils import URL
from commons_codec.model import SQLOperation
from cratedb_sqlparse import sqlparse as sqlparse_cratedb
from pympler.asizeof import asizeof
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql.elements import AsBoolean
from sqlalchemy_cratedb.dialect import CrateDialect
from tqdm import tqdm

from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util.data import str_contains
Expand Down Expand Up @@ -428,3 +434,154 @@ def get_table_names(sql: str) -> t.List[t.List[str]]:
local_names.append(table.name)
names.append(local_names)
return names


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


@define
class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
TODO: Think about refactoring this to `sqlalchemy_cratedb.support`.
"""

parameters: t.Union[t.List[t.Dict[str, t.Any]], None]
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None]

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed insert using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.parameters is None or self.cratedb_bulk_result is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def parameter_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.parameters:
return 0

Check warning on line 484 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L484

Added line #L484 was not covered by tests
return len(self.parameters)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.parameter_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)


@define
class BulkMetrics:
count_success_total: int = 0
count_error_total: int = 0
bytes_write_total: int = 0
bytes_error_total: int = 0


@define
class BulkProcessor:
connection: sa.Connection
data: t.Iterable[t.List[t.Dict[str, t.Any]]]
batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation]
progress_bar: t.Union[tqdm, None] = None
on_error: t.Literal["ignore", "raise"] = "ignore"
debug: bool = False

_metrics: BulkMetrics = Factory(BulkMetrics)

@cached_property
def log_level(self):
if self.debug:
return logger.exception

Check warning on line 524 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L523-L524

Added lines #L523 - L524 were not covered by tests
else:
return logger.warning

Check warning on line 526 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L526

Added line #L526 was not covered by tests

def start(self) -> BulkMetrics:
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for batch in self.data:
current_batch_size = len(batch)

self.progress_bar and self.progress_bar.set_description("ACQUIRE")

try:
operation = self.batch_to_operation(batch)
except Exception as ex:
self.log_level(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

Check warning on line 541 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L537-L541

Added lines #L537 - L541 were not covered by tests

self._metrics.bytes_write_total += asizeof(operation)
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
self.progress_bar and self.progress_bar.set_description("SUBMIT ")
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
except ProgrammingError:
failed_records = [operation.parameters]
count_success_local = 0

Check warning on line 562 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L560-L562

Added lines #L560 - L562 were not covered by tests

# When bulk operations fail, try inserting failed records record-by-record,
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
for record in failed_records:
try:
self.connection.execute(statement=statement, parameters=record)
self.connection.commit()
self._metrics.count_success_total += 1

Check warning on line 575 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L574-L575

Added lines #L574 - L575 were not covered by tests
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
self._metrics.count_error_total += 1
self._metrics.bytes_error_total += asizeof(record)
if self.on_error == "raise":
raise

Check warning on line 582 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L582

Added line #L582 was not covered by tests
self.progress_bar and self.progress_bar.update(n=1)

self.progress_bar and self.progress_bar.close()

return self._metrics
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dependencies = [
'importlib-metadata; python_version < "3.8"',
'importlib-resources; python_version < "3.9"',
"polars<1.7",
"pympler<1.2",
"python-dateutil<3",
"python-dotenv<2",
"python-slugify<9",
Expand Down
2 changes: 2 additions & 0 deletions tests/io/mongodb/mixed.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}}
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]}
22 changes: 21 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
"""
Expand Down Expand Up @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record.
"""

# Define source and target URLs.
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 1

assert "Dynamic nested arrays are not supported" in caplog.text


def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
Expand Down

0 comments on commit 52c1bca

Please sign in to comment.