Skip to content

Commit

Permalink
schema-reader: Log the erroring kafka message key
Browse files Browse the repository at this point in the history
- we add the kafka message key to the log in the case of an error,
this makes it easier to debug issues
- we also add a check for kafka messages without a key
  • Loading branch information
nosahama committed Sep 25, 2024
1 parent 0574efb commit 515f785
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
7 changes: 6 additions & 1 deletion karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:
assert message_key is not None
key = json_decode(message_key)
except JSONDecodeError as exc:
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
LOG.warning("Invalid JSON in msg.key(): %s at offset %s", msg.key().decode(), msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
except AssertionError as exc:
LOG.warning("Empty msg.key() at offset %s", msg.offset())
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
continue # [non-strict mode]
Expand Down
13 changes: 11 additions & 2 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
ok1_message.value.return_value = schema_str
ok1_message.offset.return_value = 1
invalid_key_message = Mock(spec=Message)
invalid_key_message.key.return_value = "invalid-key"
invalid_key_message.key.return_value = b"invalid-key"
invalid_key_message.error.return_value = None
invalid_key_message.value.return_value = schema_str
invalid_key_message.offset.return_value = 2
Expand Down Expand Up @@ -388,7 +388,16 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message:
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Invalid JSON in msg.key() at offset 1",
expected_log_message='Invalid JSON in msg.key(): {subject1::::"test""version":1"magic":1} at offset 1',
),
KafkaMessageHandlingErrorTestCase(
test_name="Message key is empty, i.e. `null/None`",
key=None,
value=b'{"value": "value does not matter at this stage, just correct JSON"}',
schema_type=None,
message_type=MessageType.schema,
expected_error=CorruptKafkaRecordException,
expected_log_message="Empty msg.key() at offset 1",
),
KafkaMessageHandlingErrorTestCase(
test_name="Keytype is missing from message key",
Expand Down

0 comments on commit 515f785

Please sign in to comment.