Skip to content

Commit

Permalink
fix: unknown Kafka errors with _RESOLVE error code
Browse files Browse the repository at this point in the history
Also don't log KafkaUnavailableError as unexpected error
  • Loading branch information
keejon committed Sep 30, 2024
1 parent 7b7a3cc commit 2c13567
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from __future__ import annotations

from aiokafka.client import UnknownTopicOrPartitionError
from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable
from aiokafka.errors import (
AuthenticationFailedError,
for_code,
IllegalStateError,
KafkaTimeoutError,
KafkaUnavailableError,
NoBrokersAvailable,
)
from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
Expand Down Expand Up @@ -52,6 +59,8 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception:
return KafkaTimeoutError()
if code == KafkaError._STATE: # pylint: disable=protected-access
return IllegalStateError()
if code == KafkaError._RESOLVE: # pylint: disable=protected-access
return KafkaUnavailableError()

return for_code(code)

Expand Down
3 changes: 3 additions & 0 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InvalidReplicationFactorError,
KafkaConfigurationError,
KafkaTimeoutError,
KafkaUnavailableError,
LeaderNotAvailableError,
NoBrokersAvailable,
NodeNotReadyError,
Expand Down Expand Up @@ -250,6 +251,8 @@ def run(self) -> None:
except ShutdownException:
self._stop_schema_reader.set()
shutdown()
except KafkaUnavailableError:
LOG.warning("Kafka cluster is unavailable or broker can't be resolved.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.warning("Unexpected exception in schema reader loop - %s", e)
Expand Down

0 comments on commit 2c13567

Please sign in to comment.