From dc751ce1e05c92d8694a5f1cb954ed4008459013 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 15:47:44 -0300 Subject: [PATCH] Revert "schema_reader: added typing" This reverts commit 3087b9a35ee312b171e6fd4ab5e14695b79cf618. --- karapace/schema_reader.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 5adcd6f08..3dd2245dd 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -18,7 +18,7 @@ from karapace.utils import json_encode, KarapaceKafkaClient from queue import Queue from threading import Lock, Thread -from typing import Dict, Optional +from typing import Dict import json import logging @@ -120,7 +120,7 @@ def __init__(self, config, master_coordinator=None): sentry_config["tags"] = {} self.stats = StatsClient(sentry_config=sentry_config) - def init_consumer(self) -> None: + def init_consumer(self): # Group not set on purpose, all consumers read the same data session_timeout_ms = self.config["session_timeout_ms"] request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) @@ -144,7 +144,7 @@ def init_consumer(self) -> None: metadata_max_age_ms=self.config["metadata_max_age_ms"], ) - def init_admin_client(self) -> bool: + def init_admin_client(self): try: self.admin_client = KafkaAdminClient( api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, @@ -168,7 +168,7 @@ def init_admin_client(self) -> bool: return False @staticmethod - def get_new_schema_topic(config: dict) -> NewTopic: + def get_new_schema_topic(config): return NewTopic( name=config["topic_name"], num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, @@ -176,7 +176,7 @@ def get_new_schema_topic(config: dict) -> NewTopic: topic_configs={"cleanup.policy": "compact"} ) - def create_schema_topic(self) -> bool: + def create_schema_topic(self): schema_topic = self.get_new_schema_topic(self.config) try: self.log.info("Creating topic: %r", schema_topic) @@ -193,7 +193,7 @@ def create_schema_topic(self) -> bool: time.sleep(5) return False - def get_schema_id(self, new_schema: TypedSchema) -> int: + def get_schema_id(self, new_schema): with self.id_lock: schemas = self.schemas.items() for schema_id, schema in schemas: @@ -203,11 +203,11 @@ def get_schema_id(self, new_schema: TypedSchema) -> int: self.global_schema_id += 1 return self.global_schema_id - def close(self) -> None: + def close(self): self.log.info("Closing schema_reader") self.running = False - def run(self) -> None: + def run(self): while self.running: try: if not self.admin_client: @@ -233,7 +233,7 @@ def run(self) -> None: self.stats.unexpected_exception(ex=e, where="schema_reader_exit") self.log.exception("Unexpected exception closing schema reader") - def handle_messages(self) -> None: + def handle_messages(self): raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) if self.ready is False and raw_msgs == {}: self.ready = True @@ -270,8 +270,8 @@ def handle_messages(self) -> None: if self.ready and add_offsets: self.queue.put(self.offset) - def handle_msg(self, key: dict, value: Optional[dict]) -> None: - if key["keytype"] == "CONFIG" and value: + def handle_msg(self, key: dict, value: dict): + if key["keytype"] == "CONFIG": if "subject" in key and key["subject"] is not None: if not value: self.log.info("Deleting compatibility config completely for subject: %r", key["subject"]) @@ -349,7 +349,7 @@ def handle_msg(self, key: dict, value: Optional[dict]) -> None: self.schemas[value["id"]] = typed_schema if value["id"] > self.global_schema_id: # Not an existing schema self.global_schema_id = value["id"] - elif key["keytype"] == "DELETE_SUBJECT" and value: + elif key["keytype"] == "DELETE_SUBJECT": self.log.info("Deleting subject: %r, value: %r", value["subject"], value) if not value["subject"] in self.subjects: self.log.error("Subject: %r did not exist, should have", value["subject"])