Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

karapace: avoid starting until topic its successfully created #904

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,12 @@ def run(self) -> None:
# Handles also a unusual case of purged schemas topic where starting offset can be > 0
# and no records to process.
self.offset = self._get_beginning_offset()
try:
self.handle_messages()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")
else:
try:
self.handle_messages()
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.exception("Unexpected exception in schema reader loop")

def _get_beginning_offset(self) -> int:
assert self.consumer is not None, "Thread must be started"
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,43 @@ async def test_key_format_detection(
assert key_formatter.get_keymode() == testcase.expected
finally:
await master_coordinator.close()


# todo: run process of `KafkaSchemaReader` in background and collect logs to assert that
# it started without any exception raised (or status code in error)
async def test_schema_topic_creation_is_needed(
kafka_servers: KafkaServers,
) -> None:
"""Test that the schema registry can start successfully if the cluster needs create the schema registry from zero."""
test_name = "test_regression_soft_delete_schemas_should_be_registered"
topic_name = new_random_name("topic")
group_id = create_group_name_factory(test_name)()

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"admin_metadata_max_age": 2,
"group_id": group_id,
"topic_name": topic_name,
}
)
master_coordinator = MasterCoordinator(config=config)

try:
await master_coordinator.start()
database = InMemoryDatabase()
offset_watcher = OffsetWatcher()

schema_reader = KafkaSchemaReader(
config=config,
offset_watcher=offset_watcher,
key_formatter=KeyFormatter(),
master_coordinator=master_coordinator,
database=database,
)
schema_reader.start()

with closing(schema_reader):
await _wait_until_reader_is_ready_and_master(master_coordinator, schema_reader)
finally:
await master_coordinator.close()
Loading