Skip to content

Commit

Permalink
BulkResponse: Add wrapper for improved decoding of HTTP bulk responses
Browse files Browse the repository at this point in the history
CrateDB HTTP bulk responses include `rowcount=` items, either signalling
if a bulk operation succeeded or failed.

- success means `rowcount=1`
- failure means `rowcount=-2`

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
  • Loading branch information
amotl committed Oct 31, 2024
1 parent 2b6b835 commit a016130
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Unreleased
server stacktraces into exception messages.
- Refactoring: The module namespace ``crate.client.test_util`` has been
renamed to ``crate.testing.util``.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
68 changes: 68 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage a response to a CrateDB bulk request.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.List[t.Dict[str, t.Any]],
results: t.List[BulkResultItem]):
if records is None:
raise ValueError("Processing a bulk response without records is an invalid operation")
if results is None:
raise ValueError("Processing a bulk response without results is an invalid operation")
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed inserts using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0

Check warning on line 53 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L53

Added line #L53 was not covered by tests
return len(self.records)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
88 changes: 88 additions & 0 deletions tests/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys
import unittest

from crate import client
from crate.client.exceptions import ProgrammingError
from .layer import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from .settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

def tearDown(self):
tearDownDropEntitiesBaseline(self)

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response_partial(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
self.assertEqual(bulk_response.record_count, 2)
self.assertEqual(bulk_response.success_count, 1)
self.assertEqual(bulk_response.failed_count, 1)

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_empty(self):

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that is empty.
with self.assertRaises(ProgrammingError) as cm:
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
self.assertEqual(
str(cm.exception),
"SQLParseException[The query contains a parameter placeholder $1, "
"but there are only 0 parameter values]")

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_bulk_response_empty_records_or_results(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

with self.assertRaises(ValueError) as cm:
BulkResponse(records=None, results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without records is an invalid operation")

with self.assertRaises(ValueError) as cm:
BulkResponse(records=[], results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without results is an invalid operation")
5 changes: 5 additions & 0 deletions tests/client/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
TestDefaultSchemaHeader
from .layer import makeSuite, setUpWithHttps, HttpsTestServerLayer, setUpCrateLayerBaseline, \
tearDownDropEntitiesBaseline, ensure_cratedb_layer
from .test_result import BulkOperationTest


def test_suite():
Expand Down Expand Up @@ -51,6 +52,10 @@ def test_suite():
# Integration tests.
layer = ensure_cratedb_layer()

s = makeSuite(BulkOperationTest)
s.layer = layer
suite.addTest(s)

s = doctest.DocFileSuite(
'docs/by-example/http.rst',
'docs/by-example/client.rst',
Expand Down

0 comments on commit a016130

Please sign in to comment.