diff --git a/src/karapace/backup/backends/v3/schema_tool.py b/src/karapace/backup/backends/v3/schema_tool.py index 65a3ea2bf..340be2477 100644 --- a/src/karapace/backup/backends/v3/schema_tool.py +++ b/src/karapace/backup/backends/v3/schema_tool.py @@ -86,7 +86,7 @@ def check_compatibility(git_target: str) -> None: for file in schema_directory.glob(f"*{extension}"): relative = relative_path(file) - if source_layout: + if not source_layout: relative = pathlib.Path(*relative.parts[1:]) with subprocess.Popen( ["git", "show", f"{git_target}:{relative}"], diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index 5df9838a2..add44eea3 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -6,7 +6,14 @@ from __future__ import annotations from aiokafka.client import UnknownTopicOrPartitionError -from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable +from aiokafka.errors import ( + AuthenticationFailedError, + for_code, + IllegalStateError, + KafkaTimeoutError, + KafkaUnavailableError, + NoBrokersAvailable, +) from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException @@ -52,6 +59,8 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: return KafkaTimeoutError() if code == KafkaError._STATE: # pylint: disable=protected-access return IllegalStateError() + if code == KafkaError._RESOLVE: # pylint: disable=protected-access + return KafkaUnavailableError() return for_code(code) diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index 01f07f379..cd04944dc 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -11,6 +11,7 @@ InvalidReplicationFactorError, KafkaConfigurationError, KafkaTimeoutError, + KafkaUnavailableError, LeaderNotAvailableError, NoBrokersAvailable, NodeNotReadyError, @@ -250,6 +251,8 @@ def run(self) -> None: except ShutdownException: self._stop_schema_reader.set() shutdown() + except KafkaUnavailableError: + LOG.warning("Kafka cluster is unavailable or broker can't be resolved.") except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") LOG.warning("Unexpected exception in schema reader loop - %s", e) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 74760897c..6936d10f0 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -11,6 +11,7 @@ class KafkaError: _UNKNOWN_PARTITION: int _TIMED_OUT: int _STATE: int + _RESOLVE: int UNKNOWN_TOPIC_OR_PART: int def code(self) -> int: ...