Skip to content

Commit

Permalink
WIP:
Browse files Browse the repository at this point in the history
1. now the schema_coordinator wait until the schema_reader reached the last message in the topic before declaring himself as master
2. changed the `_ready` flag of the schema_reader to be protected by a lock since now also the schema_coordinator can reset the `_ready` flag
3. set the close of the coordinator in the `close` method instead of in the run method
  • Loading branch information
eliax1996 committed Nov 11, 2024
1 parent c668a98 commit dfdb80c
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 38 deletions.
21 changes: 17 additions & 4 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from threading import Thread
from typing import Final

import asyncio
import logging
import time

__all__ = ("MasterCoordinator",)


LOG = logging.getLogger(__name__)


Expand All @@ -42,6 +45,10 @@ def __init__(self, config: Config) -> None:
self._sc: SchemaCoordinator | None = None
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand Down Expand Up @@ -84,14 +91,17 @@ async def _async_loop(self) -> None:
self._sc = self.init_schema_coordinator()
while self._running:
if self._sc.ready():
return
break
await asyncio.sleep(0.5)

# todo: wait a condition variable or a lock.
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._loop:
self._loop.close()
while self._loop is not None and not self._loop.is_closed():
self._loop.stop()
if not self._loop.is_running():
self._loop.close()
time.sleep(0.5)
if self._kafka_client:
await self._kafka_client.close()

Expand Down Expand Up @@ -119,8 +129,10 @@ def init_kafka_client(self) -> AIOKafkaClient:

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
assert self._schema_reader_stopper is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
schema_reader_stopper=self._schema_reader_stopper,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
Expand Down Expand Up @@ -159,3 +171,4 @@ def get_master_info(self) -> tuple[bool | None, str | None]:

async def close(self) -> None:
self._running = False
# todo set the condition variable or lock.
14 changes: 9 additions & 5 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from aiokafka.util import create_future, create_task
from collections.abc import Coroutine, Sequence
from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData
from karapace.typing import JsonData, SchemaReaderStoppper
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Any, Final
Expand Down Expand Up @@ -123,6 +123,7 @@ class SchemaCoordinator:
def __init__(
self,
client: AIOKafkaClient,
schema_reader_stopper: SchemaReaderStoppper,
hostname: str,
port: int,
scheme: str,
Expand All @@ -147,6 +148,7 @@ def __init__(
self.scheme: Final = scheme
self.master_eligibility: Final = master_eligibility
self.master_url: str | None = None
self._schema_reader_stopper = schema_reader_stopper
self._are_we_master: bool | None = False
# a value that its strictly higher than any clock, so we are sure
# we are never going to consider this the leader without explictly passing
Expand Down Expand Up @@ -212,7 +214,7 @@ def are_we_master(self) -> bool | None:
LOG.warning("No new elections performed yet.")
return None

if not self._ready:
if not self._ready or not self._schema_reader_stopper.ready():
return False

if self._are_we_master and self._initial_election_sec is not None:
Expand All @@ -224,7 +226,7 @@ def are_we_master(self) -> bool | None:
self._initial_election_sec = None
# this is the last point in time were we wait till to the end of the log queue for new
# incoming messages.
self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
return False

Expand Down Expand Up @@ -485,7 +487,7 @@ async def _on_join_complete(
# was a master change, the time before acting its always respect
# to which was the previous master (if we were master no need
# to wait more before acting)
self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
# `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp
self._initial_election_sec = time.monotonic()
Expand All @@ -506,7 +508,7 @@ async def _on_join_complete(
self.master_url = None
self._are_we_master = False
else:
LOG.info("We are not elected as master", member_id)
LOG.info("We are not elected as master")
self.master_url = master_url
self._are_we_master = False
self._ready = True
Expand All @@ -519,13 +521,15 @@ def coordinator_dead(self) -> None:
"""
if self._coordinator_dead_fut is not None and self.coordinator_id is not None:
LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id)
self._are_we_master = False
self.coordinator_id = None
self._coordinator_dead_fut.set_result(None)

def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
"""
self._are_we_master = False
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.request_rejoin()
Expand Down
55 changes: 40 additions & 15 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError, shutdown
from threading import Event, Thread
from threading import Event, Lock, Thread
from typing import Final

import asyncio
Expand Down Expand Up @@ -129,7 +129,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient:
)


class KafkaSchemaReader(Thread):
class KafkaSchemaReader(Thread, SchemaReaderStoppper):
def __init__(
self,
config: Config,
Expand Down Expand Up @@ -167,7 +167,10 @@ def __init__(
# old stale version that has not been deleted yet.)
self.offset = OFFSET_UNINITIALIZED
self._highest_offset = OFFSET_UNINITIALIZED
self.ready = False
# when a master its elected as master we should read the last arrived messages at least
# once. This lock prevent the concurrent modification of the `ready` flag.
self._ready_lock = Lock()
self._ready = False

# This event controls when the Reader should stop running, it will be
# set by another thread (e.g. `KarapaceSchemaRegistry`)
Expand Down Expand Up @@ -319,9 +322,10 @@ def _get_beginning_offset(self) -> int:
return OFFSET_UNINITIALIZED

def _is_ready(self) -> bool:
if self.ready:
return True

"""
Always call `_is_ready` only if `self._ready` is False.
Removed the check since now with the Lock the lookup it's a costly operation.
"""
assert self.consumer is not None, "Thread must be started"

try:
Expand Down Expand Up @@ -365,6 +369,14 @@ def _is_ready(self) -> bool:
def highest_offset(self) -> int:
return max(self._highest_offset, self._offset_watcher.greatest_offset())

def ready(self) -> bool:
with self._ready_lock:
return self._ready

def set_not_ready(self) -> None:
with self._ready_lock:
self._ready = False

@staticmethod
def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:
value = json_decode(raw_value)
Expand All @@ -376,10 +388,8 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"

msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process)
if self.ready is False:
self.ready = self._is_ready()
self._update_is_ready_flag()

watch_offsets = False
if self.master_coordinator is not None:
Expand Down Expand Up @@ -433,9 +443,10 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:
# Default keymode is CANONICAL and preferred unless any data consumed
# has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE
# the subsequent keys are omitted from detection.
if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL:
if msg_keymode == KeyMode.DEPRECATED_KARAPACE:
self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE)
with self._ready_lock:
if not self._ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL:
if msg_keymode == KeyMode.DEPRECATED_KARAPACE:
self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE)

value = None
message_value = msg.value()
Expand All @@ -461,14 +472,28 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None:
else:
schema_records_processed_keymode_deprecated_karapace += 1

if self.ready and watch_offsets:
self._offset_watcher.offset_seen(self.offset)
with self._ready_lock:
if self._ready and watch_offsets:
self._offset_watcher.offset_seen(self.offset)

self._report_schema_metrics(
schema_records_processed_keymode_canonical,
schema_records_processed_keymode_deprecated_karapace,
)

def _update_is_ready_flag(self) -> None:
update_ready_flag = False

# to keep the lock as few as possible.
with self._ready_lock:
if self._ready is False:
update_ready_flag = True

if update_ready_flag:
new_ready_flag = self._is_ready()
with self._ready_lock:
self._ready = new_ready_flag

def _report_schema_metrics(
self,
schema_records_processed_keymode_canonical: int,
Expand Down
9 changes: 8 additions & 1 deletion src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def __init__(self, config: Config) -> None:
master_coordinator=self.mc,
database=self.database,
)
# very ugly, left as a placeholder, since we have a bidirectional
# dependency it means that the two objects needs to be one (aka the
# mc should create the KafkaSchemaReader and inject the stopper inside
# the schema_coordinator. Left as it is to reason together to the implementation
# since semantically it's the same, after we agree on the solution proceeding with
# the refactor)
self.mc.set_stoppper(self.schema_reader)

self.schema_lock = asyncio.Lock()
self._master_lock = asyncio.Lock()
Expand Down Expand Up @@ -96,7 +103,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
elif not ignore_readiness and self.schema_reader.ready is False:
elif not ignore_readiness and self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
return are_we_master, master_url
Expand Down
5 changes: 3 additions & 2 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class SchemaErrorMessages(Enum):

class KarapaceSchemaRegistryController(KarapaceBase):
def __init__(self, config: Config) -> None:
# the `not_ready_handler` its wrong, its not expecting an async method the receiver.
super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve)

self._auth: HTTPAuthorizer | None = None
Expand All @@ -104,7 +105,7 @@ async def schema_registry_health(self) -> HealthCheck:
if self._auth is not None:
resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready
if self.schema_registry.schema_reader.ready:
if self.schema_registry.schema_reader.ready():
resp["schema_registry_startup_time_sec"] = (
self.schema_registry.schema_reader.last_check - self._process_start_time
)
Expand Down Expand Up @@ -141,7 +142,7 @@ def _check_authorization(self, user: User | None, operation: Operation, resource
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)

async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None:
if self.schema_registry.schema_reader.ready:
if self.schema_registry.schema_reader.ready():
pass
else:
# Not ready, still loading the state.
Expand Down
11 changes: 11 additions & 0 deletions src/karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from enum import Enum, unique
from karapace.errors import InvalidVersion
Expand Down Expand Up @@ -102,3 +103,13 @@ def value(self) -> int:
@property
def is_latest(self) -> bool:
return self.value == self.MINUS_1_VERSION_TAG


class SchemaReaderStoppper(ABC):
@abstractmethod
def ready(self) -> bool:
pass

@abstractmethod
def set_not_ready(self) -> None:
pass
1 change: 0 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_master_is_available, repeat_until_successful_request
from typing import AsyncIterator, Iterator
from urllib.parse import urlparse

import asyncio
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from karapace.config import set_config_defaults
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.typing import SchemaReaderStoppper
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import allocate_port
from tests.integration.utils.rest_client import RetryRestClient
Expand All @@ -16,8 +17,17 @@
import pytest


class AlwaysAvailableSchemaReaderStoppper(SchemaReaderStoppper):
def ready(self) -> bool:
return True

def set_not_ready(self) -> None:
pass


async def init_admin(config):
mc = MasterCoordinator(config=config)
mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
await mc.start()
return mc

Expand Down
Loading

0 comments on commit dfdb80c

Please sign in to comment.