diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index dfefe29e6..4519354f7 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -364,7 +364,12 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: assert message_key is not None key = json_decode(message_key) except JSONDecodeError as exc: - LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) + LOG.warning("Invalid JSON in msg.key(): %s at offset %s", msg.key().decode(), msg.offset()) + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] + except AssertionError as exc: + LOG.warning("Empty msg.key() at offset %s", msg.offset()) self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) continue # [non-strict mode] diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 680bc6446..5d625931b 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -213,7 +213,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche ok1_message.value.return_value = schema_str ok1_message.offset.return_value = 1 invalid_key_message = Mock(spec=Message) - invalid_key_message.key.return_value = "invalid-key" + invalid_key_message.key.return_value = b"invalid-key" invalid_key_message.error.return_value = None invalid_key_message.value.return_value = schema_str invalid_key_message.offset.return_value = 2 @@ -388,7 +388,16 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: schema_type=None, message_type=MessageType.schema, expected_error=CorruptKafkaRecordException, - expected_log_message="Invalid JSON in msg.key() at offset 1", + expected_log_message='Invalid JSON in msg.key(): {subject1::::"test""version":1"magic":1} at offset 1', + ), + KafkaMessageHandlingErrorTestCase( + test_name="Message key is empty, i.e. `null/None`", + key=None, + value=b'{"value": "value does not matter at this stage, just correct JSON"}', + schema_type=None, + message_type=MessageType.schema, + expected_error=CorruptKafkaRecordException, + expected_log_message="Empty msg.key() at offset 1", ), KafkaMessageHandlingErrorTestCase( test_name="Keytype is missing from message key",