From 1cdd0f18fbe6ac8381d961489cd2202d7d2a7f97 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 21 Aug 2024 15:13:15 +0200 Subject: [PATCH 1/2] schema-reader: Shutdown service if corrupt entries in `_schemas` topic Previously, when we encounter errors within the `_schemas` topic, we would continue the message loading and skip the problematic schema. This is not ideal as it might leave the application with corrupt schema data and the side-effects could be grave. What we do now is to kill the service, log the errors and allow a graceful shutdown. We will follow this work by adding metrics for such cases. --- karapace/errors.py | 11 ++++ karapace/schema_reader.py | 64 ++++++++++--------- karapace/utils.py | 9 +++ .../integration/backup/test_legacy_backup.py | 12 +++- tests/unit/test_schema_reader.py | 31 +++++---- tests/unit/test_utils.py | 19 +++++- 6 files changed, 102 insertions(+), 44 deletions(-) diff --git a/karapace/errors.py b/karapace/errors.py index b5c3ced38..ab8fb31cb 100644 --- a/karapace/errors.py +++ b/karapace/errors.py @@ -73,3 +73,14 @@ class SubjectSoftDeletedException(Exception): class SchemaTooLargeException(Exception): pass + + +class ShutdownException(Exception): + """Raised when the service has encountered an error where it should not continue and shutdown.""" + + +class CorruptKafkaRecordException(ShutdownException): + """ + Raised when a corrupt schema is present in the `_schemas` topic. This should halt the service as + we will end up with a corrupt state and could lead to various runtime issues and data mismatch. + """ diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f36b85f67..e3884aac5 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,19 +28,20 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import InvalidReferences, InvalidSchema +from karapace.errors import CorruptKafkaRecordException, InvalidReferences, InvalidSchema, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher +from karapace.protobuf.exception import ProtobufException from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, Subject, Version -from karapace.utils import json_decode, JSONDecodeError +from karapace.utils import json_decode, JSONDecodeError, shutdown from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -189,7 +190,7 @@ def run(self) -> None: except Exception as e: # pylint: disable=broad-except LOG.exception("[Admin Client] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="admin_client_instantiation") - self._stop_schema_reader.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) assert self.admin_client is not None @@ -199,10 +200,11 @@ def run(self) -> None: stack.enter_context(closing(self.consumer)) except (NodeNotReadyError, NoBrokersAvailable, AssertionError): LOG.warning("[Consumer] No Brokers available yet. Retrying") - self._stop_schema_reader.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: LOG.info("[Consumer] Invalid configuration. Bailing") - raise + self._stop_schema_reader.set() + shutdown() except Exception as e: # pylint: disable=broad-except LOG.exception("[Consumer] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="consumer_instantiation") @@ -242,6 +244,9 @@ def run(self) -> None: self.offset = self._get_beginning_offset() try: self.handle_messages() + except ShutdownException: + self._stop_schema_reader.set() + shutdown() except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") LOG.exception("Unexpected exception in schema reader loop") @@ -352,11 +357,9 @@ def handle_messages(self) -> None: assert message_key is not None key = json_decode(message_key) - except JSONDecodeError: - # Invalid entry shall also move the offset so Karapace makes progress towards ready state. - self.offset = msg.offset() + except JSONDecodeError as exc: LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) - continue + raise CorruptKafkaRecordException from exc except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: LOG.error( "Kafka authorization error when consuming from %s: %s %s", @@ -364,7 +367,7 @@ def handle_messages(self) -> None: exc, msg.error(), ) - continue + raise ShutdownException from exc assert isinstance(key, dict) msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE @@ -381,14 +384,15 @@ def handle_messages(self) -> None: if message_value: try: value = self._parse_message_value(message_value) - except JSONDecodeError: - # Invalid entry shall also move the offset so Karapace makes progress towards ready state. - self.offset = msg.offset() + except (JSONDecodeError, TypeError) as exc: LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset()) - continue + raise CorruptKafkaRecordException from exc - self.handle_msg(key, value) - self.offset = msg.offset() + try: + self.handle_msg(key, value) + self.offset = msg.offset() + except (InvalidSchema, TypeError) as exc: + raise CorruptKafkaRecordException from exc if msg_keymode == KeyMode.CANONICAL: schema_records_processed_keymode_canonical += 1 @@ -512,9 +516,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: try: schema_type_parsed = SchemaType(schema_type) - except ValueError: + except ValueError as exc: LOG.warning("Invalid schema type: %s", schema_type) - return + raise InvalidSchema from exc # This does two jobs: # - Validates the schema's JSON @@ -528,9 +532,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as exc: LOG.warning("Schema is not valid JSON") - return + raise InvalidSchema from exc elif schema_type_parsed == SchemaType.PROTOBUF: try: if schema_references: @@ -544,12 +548,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: normalize=False, ) schema_str = str(parsed_schema) - except InvalidSchema: - LOG.exception("Schema is not valid ProtoBuf definition") - return - except InvalidReferences: - LOG.exception("Invalid Protobuf references") - return + except (InvalidSchema, ProtobufException) as exc: + LOG.warning("Schema is not valid ProtoBuf definition") + raise InvalidSchema from exc + except InvalidReferences as exc: + LOG.warning("Invalid Protobuf references") + raise InvalidSchema from exc try: typed_schema = TypedSchema( @@ -559,8 +563,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: dependencies=resolved_dependencies, schema=parsed_schema, ) - except (InvalidSchema, JSONDecodeError): - return + except (InvalidSchema, JSONDecodeError) as exc: + raise InvalidSchema from exc self.database.insert_schema_version( subject=schema_subject, @@ -588,13 +592,15 @@ def handle_msg(self, key: dict, value: dict | None) -> None: self._handle_msg_delete_subject(key, value) elif message_type == MessageType.no_operation: pass - except (KeyError, ValueError): + except (KeyError, ValueError) as exc: LOG.warning("The message %r-%r has been discarded because the %s is not managed", key, value, key["keytype"]) + raise InvalidSchema("Unrecognized `keytype` within schema") from exc else: LOG.warning( "The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value ) + raise InvalidSchema("Message key doesn't contain the `keytype` attribute") def remove_referenced_by( self, diff --git a/karapace/utils.py b/karapace/utils.py index 96f87da8c..071b3e9d3 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -20,6 +20,7 @@ import importlib import logging +import signal import time if importlib.util.find_spec("ujson"): @@ -256,3 +257,11 @@ def remove_prefix(string: str, prefix: str) -> str: i += 1 return string[i:] + + +def shutdown(): + """ + Send a SIGTERM into the current running application process, which should initiate shutdown logic. + """ + LOG.warning("=======> Sending shutdown signal `SIGTERM` to Application process <=======") + signal.raise_signal(signal.SIGTERM) diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 08076dbde..e78c07f6a 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,6 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from aiohttp.client_exceptions import ClientError from aiokafka.errors import InvalidTopicError from karapace.backup import api from karapace.backup.api import BackupVersion @@ -236,9 +237,14 @@ async def test_backup_restore( topic_name=api.normalize_topic_name(None, config), ) time.sleep(1.0) - res = await registry_async_client.get(f"subjects/{subject}/versions") - assert res.status_code == 200 - assert res.json() == [1] + + # Restoring a `v1` backup with an invalid schema stops the service as expected, but I am + # unsure why the logic mismatch, needs further investigation. + if backup_file_version == "v1": + with pytest.raises(ClientError): + await registry_async_client.get(f"subjects/{subject}/versions") + else: + await registry_async_client.get(f"subjects/{subject}/versions") _assert_canonical_key_format( bootstrap_servers=kafka_servers.bootstrap_servers, schemas_topic=registry_cluster.schemas_topic diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index afbcbb976..4dbc728af 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,6 +10,7 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS +from karapace.errors import CorruptKafkaRecordException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -199,7 +200,15 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche consumer_mock = Mock(spec=KafkaConsumer) schema_str = json.dumps( - {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} + { + "subject": "test", + "version": 1, + "id": 1, + "deleted": False, + "schema": json.dumps( + {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} + ), + } ).encode() ok1_message = Mock(spec=Message) @@ -237,16 +246,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 assert schema_reader.ready is False - schema_reader.handle_messages() - assert schema_reader.offset == 2 - assert schema_reader.ready is False - schema_reader.handle_messages() - assert schema_reader.offset == 3 - assert schema_reader.ready is False - schema_reader.handle_messages() # call last time to call _is_ready() - assert schema_reader.offset == 3 - assert schema_reader.ready is True - assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP + + with pytest.raises(CorruptKafkaRecordException): + schema_reader.handle_messages() + assert schema_reader.offset == 1 + assert schema_reader.ready is False + + with pytest.raises(CorruptKafkaRecordException): + schema_reader.handle_messages() + assert schema_reader.offset == 1 + assert schema_reader.ready is False def test_soft_deleted_schema_storing() -> None: diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ed5a980e1..51633376c 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -2,7 +2,11 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from karapace.utils import remove_prefix +from _pytest.logging import LogCaptureFixture +from karapace.utils import remove_prefix, shutdown +from unittest.mock import patch + +import logging def test_remove_prefix_basic() -> None: @@ -28,3 +32,16 @@ def test_remove_prefix_multiple_occurrences_of_prefix() -> None: def test_remove_prefix_empty_string() -> None: result = remove_prefix("", "hello ") assert result == "" + + +def test_shutdown(caplog: LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING, logger="karapace.utils"): + with patch("karapace.utils.signal") as mock_signal: + mock_signal.SIGTERM = 15 + + shutdown() + mock_signal.raise_signal.assert_called_once_with(15) + for log in caplog.records: + assert log.name == "karapace.utils" + assert log.levelname == "WARNING" + assert log.message == "=======> Sending shutdown signal `SIGTERM` to Application process <=======" From 2d9f1e8eb3b44e8debda6539d949fa38c90458d2 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Fri, 30 Aug 2024 15:20:14 +0200 Subject: [PATCH 2/2] schema-reader: Wrap shutdown feature around flag We'd like to allow the shutdown logic on corrutp schema be guarded by a feature flag so we do not surprise customers, this is disabled by default. --- README.rst | 7 ++ container/compose.yml | 4 + container/prometheus/rules.yml | 21 ++--- karapace/config.py | 4 + karapace/coordinator/master_coordinator.py | 2 +- karapace/kafka_error_handler.py | 47 +++++++++++ karapace/schema_reader.py | 31 +++++--- .../integration/backup/test_legacy_backup.py | 12 +-- tests/unit/test_kafka_error_handler.py | 78 +++++++++++++++++++ tests/unit/test_schema_reader.py | 31 +++----- 10 files changed, 187 insertions(+), 50 deletions(-) create mode 100644 karapace/kafka_error_handler.py create mode 100644 tests/unit/test_kafka_error_handler.py diff --git a/README.rst b/README.rst index c600820b6..15f9dd52f 100644 --- a/README.rst +++ b/README.rst @@ -468,6 +468,13 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) + * - ``kafka_schema_reader_strict_mode`` + - ``false`` + - If enabled, causes the Karapace schema-registry service to shutdown when there are invalid schema records in the `_schemas` topic + * - ``kafka_retriable_errors_silenced`` + - ``true`` + - If enabled, kafka errors which can be retried or custom errors specififed for the service will not be raised, + instead, a warning log is emitted. This will denoise issue tracking systems, i.e. sentry Authentication and authorization of Karapace Schema Registry REST API diff --git a/container/compose.yml b/container/compose.yml index 511d924e8..fa2c53265 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -80,6 +80,8 @@ services: KARAPACE_COMPATIBILITY: FULL KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true karapace-rest: image: ghcr.io/aiven-open/karapace:develop @@ -106,6 +108,8 @@ services: KARAPACE_LOG_LEVEL: WARNING KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true prometheus: image: prom/prometheus diff --git a/container/prometheus/rules.yml b/container/prometheus/rules.yml index 9bdc83f0a..744fd5517 100644 --- a/container/prometheus/rules.yml +++ b/container/prometheus/rules.yml @@ -3,18 +3,19 @@ groups: rules: - record: karapace_exceptions_sum_by_exception expr: sum by (exception) (exception) - - alert: HighHTTPRequests - expr: karapace_http_requests_total > 10 - for: 5m + - alert: user_alert_karapace_high_non_success_http_requests + expr: sum by (instance) (count_over_time(karapace_http_requests_total{status!~'^2.*'}[1m])) > 5 + for: 2m labels: severity: warning annotations: - summary: High HTTP requests for (instance={{ $labels.instance }}) - description: "Service received\n HTTP Requests = {{ $value }}\n" - - alert: FireImmidiately - expr: karapace_schema_reader_schemas > 1 + summary: High failing HTTP requests for (instance={{ $labels.instance }}) + description: "Service returned too many non-success HTTP responses = {{ $value }}\n" + - alert: user_alert_karapace_frequent_restart + expr: sum by (app)(count_over_time(karapace_shutdown_count[30s])) > 1 + for: 2m labels: - severity: page + severity: critical annotations: - summary: Lots of schems on (instance={{ $labels.instance }}) - description: "\n Schema count = {{ $value }}\n" + summary: Karapace service instance={{ $labels.instance }} restarting frequently. + description: "Service is experiencing frequent restarts count={{ $value }}\n" diff --git a/karapace/config.py b/karapace/config.py index 894164586..bbae62701 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -80,6 +80,8 @@ class Config(TypedDict): protobuf_runtime_directory: str statsd_host: str statsd_port: int + kafka_schema_reader_strict_mode: bool + kafka_retriable_errors_silenced: bool sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False): "protobuf_runtime_directory": "runtime", "statsd_host": "127.0.0.1", "statsd_port": 8125, + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py index b9fdb3a12..5abd4bd31 100644 --- a/karapace/coordinator/master_coordinator.py +++ b/karapace/coordinator/master_coordinator.py @@ -50,7 +50,7 @@ async def start(self) -> None: await self._kafka_client.bootstrap() break except KafkaConnectionError: - LOG.exception("Kafka client bootstrap failed.") + LOG.warning("Kafka client bootstrap failed.") await asyncio.sleep(0.5) while not self._kafka_client.cluster.brokers(): diff --git a/karapace/kafka_error_handler.py b/karapace/kafka_error_handler.py new file mode 100644 index 000000000..4e8d87fd7 --- /dev/null +++ b/karapace/kafka_error_handler.py @@ -0,0 +1,47 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.config import Config +from karapace.errors import CorruptKafkaRecordException +from karapace.typing import StrEnum + +import aiokafka.errors as Errors +import enum +import logging + +LOG = logging.getLogger(__name__) + + +class KafkaErrorLocation(StrEnum): + SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR" + SCHEMA_READER = "SCHEMA_READER" + + +class KafkaRetriableErrors(enum.Enum): + SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,) + + +class KafkaErrorHandler: + def __init__(self, config: Config) -> None: + self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"] + self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"] + + def log(self, location: KafkaErrorLocation, error: BaseException) -> None: + LOG.warning("%s encountered error - %s", location, error) + + def handle_schema_coordinator_error(self, error: BaseException) -> None: + if getattr(error, "retriable", False) or ( + error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value + ): + self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error) + if not self.retriable_errors_silenced: + raise error + + def handle_schema_reader_error(self, error: BaseException) -> None: + if self.schema_reader_strict_mode: + raise CorruptKafkaRecordException from error + + def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None: + return getattr(self, f"handle_{location.lower()}_error")(error=error) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index e3884aac5..9ded95651 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,11 +28,12 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import CorruptKafkaRecordException, InvalidReferences, InvalidSchema, ShutdownException +from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.exception import ProtobufException @@ -141,6 +142,7 @@ def __init__( self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) + self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) # Thread synchronization objects # - offset is used by the REST API to wait until this thread has @@ -185,7 +187,8 @@ def run(self) -> None: LOG.warning("[Admin Client] No Brokers available yet. Retrying") self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Admin Client] Invalid configuration. Bailing") + LOG.warning("[Admin Client] Invalid configuration. Bailing") + self._stop_schema_reader.set() raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Admin Client] Unexpected exception. Retrying") @@ -202,9 +205,9 @@ def run(self) -> None: LOG.warning("[Consumer] No Brokers available yet. Retrying") self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Consumer] Invalid configuration. Bailing") + LOG.warning("[Consumer] Invalid configuration. Bailing") self._stop_schema_reader.set() - shutdown() + raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Consumer] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="consumer_instantiation") @@ -249,7 +252,7 @@ def run(self) -> None: shutdown() except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - LOG.exception("Unexpected exception in schema reader loop") + LOG.warning("Unexpected exception in schema reader loop - %s", e) def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -359,7 +362,9 @@ def handle_messages(self) -> None: key = json_decode(message_key) except JSONDecodeError as exc: LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) - raise CorruptKafkaRecordException from exc + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: LOG.error( "Kafka authorization error when consuming from %s: %s %s", @@ -367,7 +372,9 @@ def handle_messages(self) -> None: exc, msg.error(), ) - raise ShutdownException from exc + if self.kafka_error_handler.schema_reader_strict_mode: + raise ShutdownException from exc + continue assert isinstance(key, dict) msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE @@ -386,13 +393,17 @@ def handle_messages(self) -> None: value = self._parse_message_value(message_value) except (JSONDecodeError, TypeError) as exc: LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset()) - raise CorruptKafkaRecordException from exc + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] try: self.handle_msg(key, value) - self.offset = msg.offset() except (InvalidSchema, TypeError) as exc: - raise CorruptKafkaRecordException from exc + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue + finally: + self.offset = msg.offset() if msg_keymode == KeyMode.CANONICAL: schema_records_processed_keymode_canonical += 1 diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index e78c07f6a..08076dbde 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,7 +4,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiohttp.client_exceptions import ClientError from aiokafka.errors import InvalidTopicError from karapace.backup import api from karapace.backup.api import BackupVersion @@ -237,14 +236,9 @@ async def test_backup_restore( topic_name=api.normalize_topic_name(None, config), ) time.sleep(1.0) - - # Restoring a `v1` backup with an invalid schema stops the service as expected, but I am - # unsure why the logic mismatch, needs further investigation. - if backup_file_version == "v1": - with pytest.raises(ClientError): - await registry_async_client.get(f"subjects/{subject}/versions") - else: - await registry_async_client.get(f"subjects/{subject}/versions") + res = await registry_async_client.get(f"subjects/{subject}/versions") + assert res.status_code == 200 + assert res.json() == [1] _assert_canonical_key_format( bootstrap_servers=kafka_servers.bootstrap_servers, schemas_topic=registry_cluster.schemas_topic diff --git a/tests/unit/test_kafka_error_handler.py b/tests/unit/test_kafka_error_handler.py new file mode 100644 index 000000000..45e9fea1b --- /dev/null +++ b/tests/unit/test_kafka_error_handler.py @@ -0,0 +1,78 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from _pytest.logging import LogCaptureFixture +from karapace.errors import CorruptKafkaRecordException +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation + +import aiokafka.errors as Errors +import logging +import pytest + + +@pytest.fixture(name="kafka_error_handler") +def fixture_kafka_error_handler() -> KafkaErrorHandler: + config = { + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, + } + return KafkaErrorHandler(config=config) + + +@pytest.mark.parametrize( + "retriable_error", + [ + Errors.NodeNotReadyError("node is still starting"), + Errors.GroupCoordinatorNotAvailableError("group is unavailable"), + Errors.NoBrokersAvailable("no brokers available"), + ], +) +def test_handle_error_retriable_schema_coordinator( + caplog: LogCaptureFixture, + kafka_error_handler: KafkaErrorHandler, + retriable_error: Errors.KafkaError, +): + kafka_error_handler.retriable_errors_silenced = True + with caplog.at_level(logging.WARNING, logger="karapace.error_handler"): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + for log in caplog.records: + assert log.name == "karapace.kafka_error_handler" + assert log.levelname == "WARNING" + assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}" + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(retriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + +@pytest.mark.parametrize( + "nonretriable_error", + [ + ValueError("value missing"), + Errors.GroupAuthorizationFailedError("authorization failed"), + Errors.InvalidCommitOffsetSizeError("invalid commit size"), + ], +) +def test_handle_error_nonretriable_schema_coordinator( + kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException +) -> None: + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(nonretriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = True + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + +def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None: + kafka_error_handler.schema_reader_strict_mode = True + with pytest.raises(CorruptKafkaRecordException): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) + + # Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour + kafka_error_handler.schema_reader_strict_mode = False + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 4dbc728af..afbcbb976 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,7 +10,6 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS -from karapace.errors import CorruptKafkaRecordException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -200,15 +199,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche consumer_mock = Mock(spec=KafkaConsumer) schema_str = json.dumps( - { - "subject": "test", - "version": 1, - "id": 1, - "deleted": False, - "schema": json.dumps( - {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} - ), - } + {"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]} ).encode() ok1_message = Mock(spec=Message) @@ -246,16 +237,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 assert schema_reader.ready is False - - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() - assert schema_reader.offset == 1 - assert schema_reader.ready is False - - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() - assert schema_reader.offset == 1 - assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 2 + assert schema_reader.ready is False + schema_reader.handle_messages() + assert schema_reader.offset == 3 + assert schema_reader.ready is False + schema_reader.handle_messages() # call last time to call _is_ready() + assert schema_reader.offset == 3 + assert schema_reader.ready is True + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP def test_soft_deleted_schema_storing() -> None: