Skip to content

Commit

Permalink
Merge pull request #968 from Aiven-Open/keejon/fix-unknown-errors
Browse files Browse the repository at this point in the history
fix: unknown Kafka errors with _RESOLVE error code
  • Loading branch information
jjaakola-aiven authored Oct 1, 2024
2 parents 7b7a3cc + da7336f commit cc65f6a
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/karapace/backup/backends/v3/schema_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def check_compatibility(git_target: str) -> None:

for file in schema_directory.glob(f"*{extension}"):
relative = relative_path(file)
if source_layout:
if not source_layout:
relative = pathlib.Path(*relative.parts[1:])
with subprocess.Popen(
["git", "show", f"{git_target}:{relative}"],
Expand Down
11 changes: 10 additions & 1 deletion src/karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from __future__ import annotations

from aiokafka.client import UnknownTopicOrPartitionError
from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable
from aiokafka.errors import (
AuthenticationFailedError,
for_code,
IllegalStateError,
KafkaTimeoutError,
KafkaUnavailableError,
NoBrokersAvailable,
)
from collections.abc import Iterable
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
Expand Down Expand Up @@ -52,6 +59,8 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception:
return KafkaTimeoutError()
if code == KafkaError._STATE: # pylint: disable=protected-access
return IllegalStateError()
if code == KafkaError._RESOLVE: # pylint: disable=protected-access
return KafkaUnavailableError()

return for_code(code)

Expand Down
3 changes: 3 additions & 0 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
InvalidReplicationFactorError,
KafkaConfigurationError,
KafkaTimeoutError,
KafkaUnavailableError,
LeaderNotAvailableError,
NoBrokersAvailable,
NodeNotReadyError,
Expand Down Expand Up @@ -250,6 +251,8 @@ def run(self) -> None:
except ShutdownException:
self._stop_schema_reader.set()
shutdown()
except KafkaUnavailableError:
LOG.warning("Kafka cluster is unavailable or broker can't be resolved.")
except Exception as e: # pylint: disable=broad-except
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
LOG.warning("Unexpected exception in schema reader loop - %s", e)
Expand Down
1 change: 1 addition & 0 deletions stubs/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class KafkaError:
_UNKNOWN_PARTITION: int
_TIMED_OUT: int
_STATE: int
_RESOLVE: int
UNKNOWN_TOPIC_OR_PART: int

def code(self) -> int: ...
Expand Down

0 comments on commit cc65f6a

Please sign in to comment.