Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BulkResponse wrapper for improved decoding of HTTP bulk responses #649

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased
"Threads may share the module, but not connections."
- Added ``error_trace`` to string representation of an Error to relay
server stacktraces into exception messages.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
68 changes: 68 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage a response to a CrateDB bulk request.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.List[t.Dict[str, t.Any]],
results: t.List[BulkResultItem]):
if records is None:
raise ValueError("Processing a bulk response without records is an invalid operation")
if results is None:
raise ValueError("Processing a bulk response without results is an invalid operation")
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.

CrateDB signals failed inserts using `rowcount=-2`.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0

Check warning on line 53 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L53

Added line #L53 was not covered by tests
return len(self.records)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
88 changes: 88 additions & 0 deletions src/crate/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys
import unittest

from crate import client
from crate.client.exceptions import ProgrammingError
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from crate.testing.settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

def tearDown(self):
tearDownDropEntitiesBaseline(self)

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response_partial(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this more carefully, I don't like that BulkResponse is something you need con construct manually. Couldn't we directly return it from the insert execution, instead of a list of BulkResultItem?

self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
self.assertEqual(bulk_response.record_count, 2)
self.assertEqual(bulk_response.success_count, 1)
self.assertEqual(bulk_response.failed_count, 1)

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_empty(self):

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that is empty.
with self.assertRaises(ProgrammingError) as cm:
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
self.assertEqual(
str(cm.exception),
"SQLParseException[The query contains a parameter placeholder $1, "
"but there are only 0 parameter values]")

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_bulk_response_empty_records_or_results(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

with self.assertRaises(ValueError) as cm:
BulkResponse(records=None, results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without records is an invalid operation")

with self.assertRaises(ValueError) as cm:
BulkResponse(records=[], results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without results is an invalid operation")
Loading