diff --git a/README.rst b/README.rst index c600820b6..15f9dd52f 100644 --- a/README.rst +++ b/README.rst @@ -468,6 +468,13 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) + * - ``kafka_schema_reader_strict_mode`` + - ``false`` + - If enabled, causes the Karapace schema-registry service to shutdown when there are invalid schema records in the `_schemas` topic + * - ``kafka_retriable_errors_silenced`` + - ``true`` + - If enabled, kafka errors which can be retried or custom errors specififed for the service will not be raised, + instead, a warning log is emitted. This will denoise issue tracking systems, i.e. sentry Authentication and authorization of Karapace Schema Registry REST API diff --git a/container/compose.yml b/container/compose.yml index 511d924e8..fa2c53265 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -80,6 +80,8 @@ services: KARAPACE_COMPATIBILITY: FULL KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true karapace-rest: image: ghcr.io/aiven-open/karapace:develop @@ -106,6 +108,8 @@ services: KARAPACE_LOG_LEVEL: WARNING KARAPACE_STATSD_HOST: statsd-exporter KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true prometheus: image: prom/prometheus diff --git a/container/prometheus/rules.yml b/container/prometheus/rules.yml index 9bdc83f0a..744fd5517 100644 --- a/container/prometheus/rules.yml +++ b/container/prometheus/rules.yml @@ -3,18 +3,19 @@ groups: rules: - record: karapace_exceptions_sum_by_exception expr: sum by (exception) (exception) - - alert: HighHTTPRequests - expr: karapace_http_requests_total > 10 - for: 5m + - alert: user_alert_karapace_high_non_success_http_requests + expr: sum by (instance) (count_over_time(karapace_http_requests_total{status!~'^2.*'}[1m])) > 5 + for: 2m labels: severity: warning annotations: - summary: High HTTP requests for (instance={{ $labels.instance }}) - description: "Service received\n HTTP Requests = {{ $value }}\n" - - alert: FireImmidiately - expr: karapace_schema_reader_schemas > 1 + summary: High failing HTTP requests for (instance={{ $labels.instance }}) + description: "Service returned too many non-success HTTP responses = {{ $value }}\n" + - alert: user_alert_karapace_frequent_restart + expr: sum by (app)(count_over_time(karapace_shutdown_count[30s])) > 1 + for: 2m labels: - severity: page + severity: critical annotations: - summary: Lots of schems on (instance={{ $labels.instance }}) - description: "\n Schema count = {{ $value }}\n" + summary: Karapace service instance={{ $labels.instance }} restarting frequently. + description: "Service is experiencing frequent restarts count={{ $value }}\n" diff --git a/karapace/config.py b/karapace/config.py index 894164586..bbae62701 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -80,6 +80,8 @@ class Config(TypedDict): protobuf_runtime_directory: str statsd_host: str statsd_port: int + kafka_schema_reader_strict_mode: bool + kafka_retriable_errors_silenced: bool sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False): "protobuf_runtime_directory": "runtime", "statsd_host": "127.0.0.1", "statsd_port": 8125, + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py index b9fdb3a12..5abd4bd31 100644 --- a/karapace/coordinator/master_coordinator.py +++ b/karapace/coordinator/master_coordinator.py @@ -50,7 +50,7 @@ async def start(self) -> None: await self._kafka_client.bootstrap() break except KafkaConnectionError: - LOG.exception("Kafka client bootstrap failed.") + LOG.warning("Kafka client bootstrap failed.") await asyncio.sleep(0.5) while not self._kafka_client.cluster.brokers(): diff --git a/karapace/errors.py b/karapace/errors.py index b5c3ced38..ab8fb31cb 100644 --- a/karapace/errors.py +++ b/karapace/errors.py @@ -73,3 +73,14 @@ class SubjectSoftDeletedException(Exception): class SchemaTooLargeException(Exception): pass + + +class ShutdownException(Exception): + """Raised when the service has encountered an error where it should not continue and shutdown.""" + + +class CorruptKafkaRecordException(ShutdownException): + """ + Raised when a corrupt schema is present in the `_schemas` topic. This should halt the service as + we will end up with a corrupt state and could lead to various runtime issues and data mismatch. + """ diff --git a/karapace/kafka_error_handler.py b/karapace/kafka_error_handler.py new file mode 100644 index 000000000..4e8d87fd7 --- /dev/null +++ b/karapace/kafka_error_handler.py @@ -0,0 +1,47 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from karapace.config import Config +from karapace.errors import CorruptKafkaRecordException +from karapace.typing import StrEnum + +import aiokafka.errors as Errors +import enum +import logging + +LOG = logging.getLogger(__name__) + + +class KafkaErrorLocation(StrEnum): + SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR" + SCHEMA_READER = "SCHEMA_READER" + + +class KafkaRetriableErrors(enum.Enum): + SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,) + + +class KafkaErrorHandler: + def __init__(self, config: Config) -> None: + self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"] + self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"] + + def log(self, location: KafkaErrorLocation, error: BaseException) -> None: + LOG.warning("%s encountered error - %s", location, error) + + def handle_schema_coordinator_error(self, error: BaseException) -> None: + if getattr(error, "retriable", False) or ( + error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value + ): + self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error) + if not self.retriable_errors_silenced: + raise error + + def handle_schema_reader_error(self, error: BaseException) -> None: + if self.schema_reader_strict_mode: + raise CorruptKafkaRecordException from error + + def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None: + return getattr(self, f"handle_{location.lower()}_error")(error=error) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index f36b85f67..9ded95651 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -28,19 +28,21 @@ from karapace.config import Config from karapace.coordinator.master_coordinator import MasterCoordinator from karapace.dependency import Dependency -from karapace.errors import InvalidReferences, InvalidSchema +from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.offset_watcher import OffsetWatcher +from karapace.protobuf.exception import ProtobufException from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, SchemaId, Subject, Version -from karapace.utils import json_decode, JSONDecodeError +from karapace.utils import json_decode, JSONDecodeError, shutdown from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -140,6 +142,7 @@ def __init__( self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher self.stats = StatsClient(config=config) + self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) # Thread synchronization objects # - offset is used by the REST API to wait until this thread has @@ -184,12 +187,13 @@ def run(self) -> None: LOG.warning("[Admin Client] No Brokers available yet. Retrying") self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Admin Client] Invalid configuration. Bailing") + LOG.warning("[Admin Client] Invalid configuration. Bailing") + self._stop_schema_reader.set() raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Admin Client] Unexpected exception. Retrying") self.stats.unexpected_exception(ex=e, where="admin_client_instantiation") - self._stop_schema_reader.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) assert self.admin_client is not None @@ -199,9 +203,10 @@ def run(self) -> None: stack.enter_context(closing(self.consumer)) except (NodeNotReadyError, NoBrokersAvailable, AssertionError): LOG.warning("[Consumer] No Brokers available yet. Retrying") - self._stop_schema_reader.wait(timeout=2.0) + self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS) except KafkaConfigurationError: - LOG.info("[Consumer] Invalid configuration. Bailing") + LOG.warning("[Consumer] Invalid configuration. Bailing") + self._stop_schema_reader.set() raise except Exception as e: # pylint: disable=broad-except LOG.exception("[Consumer] Unexpected exception. Retrying") @@ -242,9 +247,12 @@ def run(self) -> None: self.offset = self._get_beginning_offset() try: self.handle_messages() + except ShutdownException: + self._stop_schema_reader.set() + shutdown() except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - LOG.exception("Unexpected exception in schema reader loop") + LOG.warning("Unexpected exception in schema reader loop - %s", e) def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" @@ -352,11 +360,11 @@ def handle_messages(self) -> None: assert message_key is not None key = json_decode(message_key) - except JSONDecodeError: - # Invalid entry shall also move the offset so Karapace makes progress towards ready state. - self.offset = msg.offset() + except JSONDecodeError as exc: LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset()) - continue + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc: LOG.error( "Kafka authorization error when consuming from %s: %s %s", @@ -364,6 +372,8 @@ def handle_messages(self) -> None: exc, msg.error(), ) + if self.kafka_error_handler.schema_reader_strict_mode: + raise ShutdownException from exc continue assert isinstance(key, dict) @@ -381,14 +391,19 @@ def handle_messages(self) -> None: if message_value: try: value = self._parse_message_value(message_value) - except JSONDecodeError: - # Invalid entry shall also move the offset so Karapace makes progress towards ready state. - self.offset = msg.offset() + except (JSONDecodeError, TypeError) as exc: LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset()) - continue + self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress. + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue # [non-strict mode] - self.handle_msg(key, value) - self.offset = msg.offset() + try: + self.handle_msg(key, value) + except (InvalidSchema, TypeError) as exc: + self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc) + continue + finally: + self.offset = msg.offset() if msg_keymode == KeyMode.CANONICAL: schema_records_processed_keymode_canonical += 1 @@ -512,9 +527,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: try: schema_type_parsed = SchemaType(schema_type) - except ValueError: + except ValueError as exc: LOG.warning("Invalid schema type: %s", schema_type) - return + raise InvalidSchema from exc # This does two jobs: # - Validates the schema's JSON @@ -528,9 +543,9 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]: try: schema_str = json.dumps(json.loads(schema_str), sort_keys=True) - except json.JSONDecodeError: + except json.JSONDecodeError as exc: LOG.warning("Schema is not valid JSON") - return + raise InvalidSchema from exc elif schema_type_parsed == SchemaType.PROTOBUF: try: if schema_references: @@ -544,12 +559,12 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: normalize=False, ) schema_str = str(parsed_schema) - except InvalidSchema: - LOG.exception("Schema is not valid ProtoBuf definition") - return - except InvalidReferences: - LOG.exception("Invalid Protobuf references") - return + except (InvalidSchema, ProtobufException) as exc: + LOG.warning("Schema is not valid ProtoBuf definition") + raise InvalidSchema from exc + except InvalidReferences as exc: + LOG.warning("Invalid Protobuf references") + raise InvalidSchema from exc try: typed_schema = TypedSchema( @@ -559,8 +574,8 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None: dependencies=resolved_dependencies, schema=parsed_schema, ) - except (InvalidSchema, JSONDecodeError): - return + except (InvalidSchema, JSONDecodeError) as exc: + raise InvalidSchema from exc self.database.insert_schema_version( subject=schema_subject, @@ -588,13 +603,15 @@ def handle_msg(self, key: dict, value: dict | None) -> None: self._handle_msg_delete_subject(key, value) elif message_type == MessageType.no_operation: pass - except (KeyError, ValueError): + except (KeyError, ValueError) as exc: LOG.warning("The message %r-%r has been discarded because the %s is not managed", key, value, key["keytype"]) + raise InvalidSchema("Unrecognized `keytype` within schema") from exc else: LOG.warning( "The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value ) + raise InvalidSchema("Message key doesn't contain the `keytype` attribute") def remove_referenced_by( self, diff --git a/karapace/utils.py b/karapace/utils.py index 96f87da8c..071b3e9d3 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -20,6 +20,7 @@ import importlib import logging +import signal import time if importlib.util.find_spec("ujson"): @@ -256,3 +257,11 @@ def remove_prefix(string: str, prefix: str) -> str: i += 1 return string[i:] + + +def shutdown(): + """ + Send a SIGTERM into the current running application process, which should initiate shutdown logic. + """ + LOG.warning("=======> Sending shutdown signal `SIGTERM` to Application process <=======") + signal.raise_signal(signal.SIGTERM) diff --git a/tests/unit/test_kafka_error_handler.py b/tests/unit/test_kafka_error_handler.py new file mode 100644 index 000000000..45e9fea1b --- /dev/null +++ b/tests/unit/test_kafka_error_handler.py @@ -0,0 +1,78 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from _pytest.logging import LogCaptureFixture +from karapace.errors import CorruptKafkaRecordException +from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation + +import aiokafka.errors as Errors +import logging +import pytest + + +@pytest.fixture(name="kafka_error_handler") +def fixture_kafka_error_handler() -> KafkaErrorHandler: + config = { + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, + } + return KafkaErrorHandler(config=config) + + +@pytest.mark.parametrize( + "retriable_error", + [ + Errors.NodeNotReadyError("node is still starting"), + Errors.GroupCoordinatorNotAvailableError("group is unavailable"), + Errors.NoBrokersAvailable("no brokers available"), + ], +) +def test_handle_error_retriable_schema_coordinator( + caplog: LogCaptureFixture, + kafka_error_handler: KafkaErrorHandler, + retriable_error: Errors.KafkaError, +): + kafka_error_handler.retriable_errors_silenced = True + with caplog.at_level(logging.WARNING, logger="karapace.error_handler"): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + for log in caplog.records: + assert log.name == "karapace.kafka_error_handler" + assert log.levelname == "WARNING" + assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}" + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(retriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error) + + +@pytest.mark.parametrize( + "nonretriable_error", + [ + ValueError("value missing"), + Errors.GroupAuthorizationFailedError("authorization failed"), + Errors.InvalidCommitOffsetSizeError("invalid commit size"), + ], +) +def test_handle_error_nonretriable_schema_coordinator( + kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException +) -> None: + kafka_error_handler.retriable_errors_silenced = False + with pytest.raises(nonretriable_error.__class__): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + # Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour + kafka_error_handler.retriable_errors_silenced = True + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error) + + +def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None: + kafka_error_handler.schema_reader_strict_mode = True + with pytest.raises(CorruptKafkaRecordException): + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) + + # Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour + kafka_error_handler.schema_reader_strict_mode = False + kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index ed5a980e1..51633376c 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -2,7 +2,11 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from karapace.utils import remove_prefix +from _pytest.logging import LogCaptureFixture +from karapace.utils import remove_prefix, shutdown +from unittest.mock import patch + +import logging def test_remove_prefix_basic() -> None: @@ -28,3 +32,16 @@ def test_remove_prefix_multiple_occurrences_of_prefix() -> None: def test_remove_prefix_empty_string() -> None: result = remove_prefix("", "hello ") assert result == "" + + +def test_shutdown(caplog: LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING, logger="karapace.utils"): + with patch("karapace.utils.signal") as mock_signal: + mock_signal.SIGTERM = 15 + + shutdown() + mock_signal.raise_signal.assert_called_once_with(15) + for log in caplog.records: + assert log.name == "karapace.utils" + assert log.levelname == "WARNING" + assert log.message == "=======> Sending shutdown signal `SIGTERM` to Application process <======="