From e3c51fc072cdc707a4931a154efca5882ca61eee Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Thu, 20 Jun 2024 12:22:47 +0200 Subject: [PATCH 1/2] karapace: avoid starting until topic its successfully created Since the underlying `create_topics` function unless the `operation_timeout` its async (the effects of this call aren't immediately visible after the call its performed), if karapace its started on a cluster that its slow enough in creating the topic, we get an unhandled exception in the startup of the service of type: ``` aiokafka.errors.UnknownTopicOrPartitionError: [Error 3] UnknownTopicOrPartitionError ``` The fix its simple, try to query from the cluster (immediately after having commanded the creation of the topic) the offsets of the topic. If the returned status its `OFFSET_UNINITIALIZED` we simply need to try again shortly later. --- karapace/schema_reader.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9bcd69260..ecf0ce69b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -240,11 +240,12 @@ def run(self) -> None: # Handles also a unusual case of purged schemas topic where starting offset can be > 0 # and no records to process. self.offset = self._get_beginning_offset() - try: - self.handle_messages() - except Exception as e: # pylint: disable=broad-except - self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - LOG.exception("Unexpected exception in schema reader loop") + else: + try: + self.handle_messages() + except Exception as e: # pylint: disable=broad-except + self.stats.unexpected_exception(ex=e, where="schema_reader_loop") + LOG.exception("Unexpected exception in schema reader loop") def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" From e8467ae755bbe9d6994e2e6274a5da057708d71a Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Thu, 20 Jun 2024 15:09:53 +0200 Subject: [PATCH 2/2] WIP --- tests/integration/test_schema_reader.py | 40 +++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 738f76498..936c28adc 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -287,3 +287,43 @@ async def test_key_format_detection( assert key_formatter.get_keymode() == testcase.expected finally: await master_coordinator.close() + + +# todo: run process of `KafkaSchemaReader` in background and collect logs to assert that +# it started without any exception raised (or status code in error) +async def test_schema_topic_creation_is_needed( + kafka_servers: KafkaServers, +) -> None: + """Test that the schema registry can start successfully if the cluster needs create the schema registry from zero.""" + test_name = "test_regression_soft_delete_schemas_should_be_registered" + topic_name = new_random_name("topic") + group_id = create_group_name_factory(test_name)() + + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "admin_metadata_max_age": 2, + "group_id": group_id, + "topic_name": topic_name, + } + ) + master_coordinator = MasterCoordinator(config=config) + + try: + await master_coordinator.start() + database = InMemoryDatabase() + offset_watcher = OffsetWatcher() + + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=KeyFormatter(), + master_coordinator=master_coordinator, + database=database, + ) + schema_reader.start() + + with closing(schema_reader): + await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) + finally: + await master_coordinator.close()