diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 9bcd69260..ecf0ce69b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -240,11 +240,12 @@ def run(self) -> None: # Handles also a unusual case of purged schemas topic where starting offset can be > 0 # and no records to process. self.offset = self._get_beginning_offset() - try: - self.handle_messages() - except Exception as e: # pylint: disable=broad-except - self.stats.unexpected_exception(ex=e, where="schema_reader_loop") - LOG.exception("Unexpected exception in schema reader loop") + else: + try: + self.handle_messages() + except Exception as e: # pylint: disable=broad-except + self.stats.unexpected_exception(ex=e, where="schema_reader_loop") + LOG.exception("Unexpected exception in schema reader loop") def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 738f76498..936c28adc 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -287,3 +287,43 @@ async def test_key_format_detection( assert key_formatter.get_keymode() == testcase.expected finally: await master_coordinator.close() + + +# todo: run process of `KafkaSchemaReader` in background and collect logs to assert that +# it started without any exception raised (or status code in error) +async def test_schema_topic_creation_is_needed( + kafka_servers: KafkaServers, +) -> None: + """Test that the schema registry can start successfully if the cluster needs create the schema registry from zero.""" + test_name = "test_regression_soft_delete_schemas_should_be_registered" + topic_name = new_random_name("topic") + group_id = create_group_name_factory(test_name)() + + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + "admin_metadata_max_age": 2, + "group_id": group_id, + "topic_name": topic_name, + } + ) + master_coordinator = MasterCoordinator(config=config) + + try: + await master_coordinator.start() + database = InMemoryDatabase() + offset_watcher = OffsetWatcher() + + schema_reader = KafkaSchemaReader( + config=config, + offset_watcher=offset_watcher, + key_formatter=KeyFormatter(), + master_coordinator=master_coordinator, + database=database, + ) + schema_reader.start() + + with closing(schema_reader): + await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader) + finally: + await master_coordinator.close()