diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 7026ea853..680bc6446 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -26,7 +26,7 @@ from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from tests.base_testcase import BaseTestCase -from tests.utils import schema_protobuf_invalid +from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref from typing import Callable, List, Tuple from unittest.mock import Mock @@ -485,7 +485,7 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', value=( b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false, "schema":' - + json.dumps(schema_protobuf_invalid).encode() + + json.dumps(schema_protobuf_invalid_because_corrupted).encode() + b"}" ), schema_type=SchemaType.PROTOBUF, @@ -515,3 +515,62 @@ def test_message_error_handling( assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" assert log.message == test_case.expected_log_message + + +def test_message_error_handling_with_invalid_reference_schema_protobuf( + caplog: LogCaptureFixture, + schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader], + message_factory: Callable[[bytes, bytes, int], Message], +) -> None: + # Given an invalid schema (corrupted) + key_ref = b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}' + value_ref = ( + b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + json.dumps(schema_protobuf_invalid_because_corrupted).encode() + + b"}" + ) + message_ref = message_factory(key=key_ref, value=value_ref) + + # And given a schema referencing that corrupted schema (valid otherwise) + key_using_ref = b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}' + value_using_ref = ( + b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + json.dumps(schema_protobuf_with_invalid_ref).encode() + + b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]' + + b"}" + ) + message_using_ref = message_factory(key=key_using_ref, value=value_using_ref) + + with caplog.at_level(logging.WARN, logger="karapace.schema_reader"): + # When handling the corrupted schema + schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],)) + + # Then the schema is recognised as invalid + with pytest.raises(CorruptKafkaRecordException): + schema_reader.handle_messages() + + assert schema_reader.offset == 1 + assert not schema_reader.ready + + # When handling the schema + schema_reader.consumer.consume.side_effect = ([message_using_ref],) + + # Then the schema is recognised as invalid because of the corrupted referenced schema + with pytest.raises(CorruptKafkaRecordException): + schema_reader.handle_messages() + + assert schema_reader.offset == 1 + assert not schema_reader.ready + + warn_records = [r for r in caplog.records if r.levelname == "WARNING"] + + assert len(warn_records) == 2 + + # Check that different warnings are logged for each schema + assert warn_records[0].name == "karapace.schema_reader" + assert warn_records[0].message == "Schema is not valid ProtoBuf definition" + + assert warn_records[1].name == "karapace.schema_reader" + assert warn_records[1].message == "Invalid Protobuf references" diff --git a/tests/utils.py b/tests/utils.py index 352745f94..3757e0739 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -148,7 +148,7 @@ {"q": 3, "sensor_type": "L1", "nums": [3, 4], "order": {"item": "ABC01223"}}, ] -schema_protobuf_invalid = """ +schema_protobuf_invalid_because_corrupted = """ |o3" | |opti -- om.codingharbour.protobuf"; @@ -162,7 +162,19 @@ | HIGH = 0 | MIDDLE = ; """ -schema_protobuf_invalid = trim_margin(schema_protobuf_invalid) +schema_protobuf_invalid_because_corrupted = trim_margin(schema_protobuf_invalid_because_corrupted) + +schema_protobuf_with_invalid_ref = """ + |syntax = "proto3"; + | + |import "Message.proto"; + | + |message MessageWithInvalidRef { + | string name = 1; + | Message ref = 2; + |} + |""" +schema_protobuf_with_invalid_ref = trim_margin(schema_protobuf_with_invalid_ref) schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)}