Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve health check to fail if schema_reader raises exceptions #957

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from functools import partial
from http import HTTPStatus
from karapace.config import Config
from karapace.dataclasses import default_dataclass
from karapace.rapu import HTTPRequest, HTTPResponse, RestApp
from karapace.typing import JsonObject
from karapace.utils import json_encode
Expand All @@ -22,7 +23,14 @@
import aiohttp.web
import time

HealthHook: TypeAlias = Callable[[], Awaitable[JsonObject]]

@default_dataclass
class HealthCheck:
status: JsonObject
healthy: bool


HealthHook: TypeAlias = Callable[[], Awaitable[HealthCheck]]


class KarapaceBase(RestApp):
Expand Down Expand Up @@ -96,11 +104,15 @@ async def health(self, _request: Request) -> aiohttp.web.Response:
"process_uptime_sec": int(time.monotonic() - self._process_start_time),
"karapace_version": __version__,
}
status_code = HTTPStatus.OK
for hook in self.health_hooks:
resp.update(await hook())
check = await hook()
resp.update(check.status)
if not check.healthy:
status_code = HTTPStatus.SERVICE_UNAVAILABLE
return aiohttp.web.Response(
body=json_encode(resp, binary=True, compact=True),
status=HTTPStatus.OK.value,
status=status_code.value,
headers={"Content-Type": "application/json"},
)

Expand Down
40 changes: 39 additions & 1 deletion src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from avro.schema import Schema as AvroSchema
from collections.abc import Mapping, Sequence
from confluent_kafka import Message, TopicPartition
from confluent_kafka import Message, TopicCollection, TopicPartition
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
Expand All @@ -48,6 +48,7 @@
from threading import Event, Thread
from typing import Final

import asyncio
import json
import logging
import time
Expand All @@ -62,6 +63,11 @@
KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0
SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0

# If handle_messages throws at least UNHEALTHY_CONSECUTIVE_ERRORS
# for UNHEALTHY_TIMEOUT_SECONDS the SchemaReader will be reported unhealthy
UNHEALTHY_TIMEOUT_SECONDS: Final = 10.0
UNHEALTHY_CONSECUTIVE_ERRORS: Final = 3
Comment on lines +68 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not mandatory for this pr, would be nice to have those configurable. Maybe depending on the load and the cluster size those numbers needs to be adjusted.


# For good startup performance the consumption of multiple
# records for each consume round is essential.
# Consumer default is 1 message for each consume call and after
Expand Down Expand Up @@ -176,6 +182,9 @@ def __init__(
self.start_time = time.monotonic()
self.startup_previous_processed_offset = 0

self.consecutive_unexpected_errors: int = 0
self.consecutive_unexpected_errors_start: float = 0

def close(self) -> None:
LOG.info("Closing schema_reader")
self._stop_schema_reader.set()
Expand Down Expand Up @@ -249,15 +258,44 @@ def run(self) -> None:
self.offset = self._get_beginning_offset()
try:
self.handle_messages()
self.consecutive_unexpected_errors = 0
except ShutdownException:
self._stop_schema_reader.set()
shutdown()
except KafkaUnavailableError:
self.consecutive_unexpected_errors += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to be able to differentiate the cause, like if kafka its down obviously karapace isn't going to work. But again not something I wanna see in this pr, as it is its a good improvement already

LOG.warning("Kafka cluster is unavailable or broker can't be resolved.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
self.consecutive_unexpected_errors += 1
if self.consecutive_unexpected_errors == 1:
self.consecutive_unexpected_errors_start = time.monotonic()
LOG.warning("Unexpected exception in schema reader loop - %s", e)

async def is_healthy(self) -> bool:
if (
self.consecutive_unexpected_errors >= UNHEALTHY_CONSECUTIVE_ERRORS
and (duration := time.monotonic() - self.consecutive_unexpected_errors_start) >= UNHEALTHY_TIMEOUT_SECONDS
):
LOG.warning(
"Health check failed with %s consecutive errors in %s seconds", self.consecutive_unexpected_errors, duration
)
return False

try:
# Explicitly check if topic exists.
# This needs to be done because in case of missing topic the consumer will not repeat the error
# on conscutive consume calls and instead will return empty list.
assert self.admin_client is not None
topic = self.config["topic_name"]
res = self.admin_client.describe_topics(TopicCollection([topic]))
await asyncio.wrap_future(res[topic])
except Exception as e: # pylint: disable=broad-except
LOG.warning("Health check failed with %r", e)
return False

return True

def _get_beginning_offset(self) -> int:
assert self.consumer is not None, "Thread must be started"

Expand Down
13 changes: 9 additions & 4 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
SubjectSoftDeletedException,
VersionNotFoundException,
)
from karapace.karapace import KarapaceBase
from karapace.karapace import HealthCheck, KarapaceBase
from karapace.protobuf.exception import ProtobufUnresolvedDependencyException
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.schema_registry import KarapaceSchemaRegistry
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from karapace.typing import JsonData, SchemaId, Subject, Version
from karapace.utils import JSONDecodeError
from typing import Any

Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(self, config: Config) -> None:
self.app.on_startup.append(self._create_forward_client)
self.health_hooks.append(self.schema_registry_health)

async def schema_registry_health(self) -> JsonObject:
async def schema_registry_health(self) -> HealthCheck:
resp = {}
if self._auth is not None:
resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
Expand All @@ -115,7 +115,12 @@ async def schema_registry_health(self) -> JsonObject:
resp["schema_registry_primary_url"] = cs.primary_url
resp["schema_registry_coordinator_running"] = cs.is_running
resp["schema_registry_coordinator_generation_id"] = cs.group_generation_id
return resp

healthy = True
if not await self.schema_registry.schema_reader.is_healthy():
healthy = False

return HealthCheck(status=resp, healthy=healthy)

async def _start_schema_registry(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument
"""Callback for aiohttp.Application.on_startup"""
Expand Down
2 changes: 2 additions & 0 deletions stubs/confluent_kafka/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ from .cimpl import (
TIMESTAMP_CREATE_TIME,
TIMESTAMP_LOG_APPEND_TIME,
TIMESTAMP_NOT_AVAILABLE,
TopicCollection,
TopicPartition,
)

Expand All @@ -22,4 +23,5 @@ __all__ = (
"TIMESTAMP_LOG_APPEND_TIME",
"TIMESTAMP_NOT_AVAILABLE",
"TopicPartition",
"TopicCollection",
)
3 changes: 2 additions & 1 deletion stubs/confluent_kafka/admin/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ from ._listoffsets import ListOffsetsResultInfo, OffsetSpec
from ._metadata import BrokerMetadata, ClusterMetadata, PartitionMetadata, TopicMetadata
from ._resource import ResourceType
from concurrent.futures import Future
from confluent_kafka import IsolationLevel, TopicPartition
from confluent_kafka import IsolationLevel, TopicCollection, TopicPartition
from typing import Callable

__all__ = (
Expand Down Expand Up @@ -52,3 +52,4 @@ class AdminClient:
def describe_configs(
self, resources: list[ConfigResource], request_timeout: float = -1
) -> dict[ConfigResource, Future[dict[str, ConfigEntry]]]: ...
def describe_topics(self, topics: TopicCollection) -> dict[str, Future]: ...
7 changes: 7 additions & 0 deletions stubs/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ class TopicPartition:
self.leader_epoch: int | None
self.error: KafkaError | None

class TopicCollection:
def __init__(
self,
topic_names: list[str],
) -> None:
self.topic_names: list[str]

class Message:
def offset(self) -> int: ...
def timestamp(self) -> tuple[int, int]: ...
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from tenacity import retry, stop_after_delay, wait_fixed
from tests.integration.utils.cluster import RegistryDescription

import http


async def test_health_check(
registry_cluster: RegistryDescription, registry_async_client: Client, admin_client: KafkaAdminClient
) -> None:
res = await registry_async_client.get("/_health")
assert res.ok

admin_client.delete_topic(registry_cluster.schemas_topic)

@retry(stop=stop_after_delay(10), wait=wait_fixed(1), reraise=True)
async def check_health():
res = await registry_async_client.get("/_health")
assert res.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE, "should report unhealthy after topic has been deleted"

await check_health()
83 changes: 81 additions & 2 deletions tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from _pytest.logging import LogCaptureFixture
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from confluent_kafka import Message
from dataclasses import dataclass
from karapace.config import DEFAULTS
Expand All @@ -25,9 +25,10 @@
)
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from pytest import MonkeyPatch
from tests.base_testcase import BaseTestCase
from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref
from typing import Callable
from typing import Callable, Optional
from unittest.mock import Mock

import confluent_kafka
Expand Down Expand Up @@ -325,6 +326,84 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None:
assert log.message == "Hard delete: version: Version(2) for subject: 'test-subject' did not exist, should have"


@dataclass
class HealthCheckTestCase(BaseTestCase):
current_time: float
consecutive_unexpected_errors: int
consecutive_unexpected_errors_start: float
healthy: bool
check_topic_error: Optional[Exception] = None


@pytest.mark.parametrize(
"testcase",
[
HealthCheckTestCase(
test_name="No errors",
current_time=0,
consecutive_unexpected_errors=0,
consecutive_unexpected_errors_start=0,
healthy=True,
),
HealthCheckTestCase(
test_name="10 errors in 5 seconds",
current_time=5,
consecutive_unexpected_errors=10,
consecutive_unexpected_errors_start=0,
healthy=True,
),
HealthCheckTestCase(
test_name="1 error in 20 seconds",
current_time=20,
consecutive_unexpected_errors=1,
consecutive_unexpected_errors_start=0,
healthy=True,
),
HealthCheckTestCase(
test_name="3 errors in 10 seconds",
current_time=10,
consecutive_unexpected_errors=3,
consecutive_unexpected_errors_start=0,
healthy=False,
),
HealthCheckTestCase(
test_name="check topic error",
current_time=5,
consecutive_unexpected_errors=1,
consecutive_unexpected_errors_start=0,
healthy=False,
check_topic_error=Exception("Somethings wrong"),
),
],
)
async def test_schema_reader_health_check(testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch) -> None:
offset_watcher = OffsetWatcher()
key_formatter_mock = Mock()
admin_client_mock = Mock()

emtpy_future = Future()
if testcase.check_topic_error:
emtpy_future.set_exception(testcase.check_topic_error)
else:
emtpy_future.set_result(None)
admin_client_mock.describe_topics.return_value = {DEFAULTS["topic_name"]: emtpy_future}

schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)

monkeypatch.setattr(time, "monotonic", lambda: testcase.current_time)
schema_reader.admin_client = admin_client_mock
schema_reader.consecutive_unexpected_errors = testcase.consecutive_unexpected_errors
schema_reader.consecutive_unexpected_errors_start = testcase.consecutive_unexpected_errors_start

assert await schema_reader.is_healthy() == testcase.healthy


@dataclass
class KafkaMessageHandlingErrorTestCase(BaseTestCase):
key: bytes
Expand Down
Loading