Skip to content

Commit

Permalink
Revert "schema_reader: added typing"
Browse files Browse the repository at this point in the history
This reverts commit 3087b9a.
  • Loading branch information
Augusto F. Hack committed Dec 16, 2021
1 parent 423d6c8 commit dc751ce
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from karapace.utils import json_encode, KarapaceKafkaClient
from queue import Queue
from threading import Lock, Thread
from typing import Dict, Optional
from typing import Dict

import json
import logging
Expand Down Expand Up @@ -120,7 +120,7 @@ def __init__(self, config, master_coordinator=None):
sentry_config["tags"] = {}
self.stats = StatsClient(sentry_config=sentry_config)

def init_consumer(self) -> None:
def init_consumer(self):
# Group not set on purpose, all consumers read the same data
session_timeout_ms = self.config["session_timeout_ms"]
request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"])
Expand All @@ -144,7 +144,7 @@ def init_consumer(self) -> None:
metadata_max_age_ms=self.config["metadata_max_age_ms"],
)

def init_admin_client(self) -> bool:
def init_admin_client(self):
try:
self.admin_client = KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
Expand All @@ -168,15 +168,15 @@ def init_admin_client(self) -> bool:
return False

@staticmethod
def get_new_schema_topic(config: dict) -> NewTopic:
def get_new_schema_topic(config):
return NewTopic(
name=config["topic_name"],
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=config["replication_factor"],
topic_configs={"cleanup.policy": "compact"}
)

def create_schema_topic(self) -> bool:
def create_schema_topic(self):
schema_topic = self.get_new_schema_topic(self.config)
try:
self.log.info("Creating topic: %r", schema_topic)
Expand All @@ -193,7 +193,7 @@ def create_schema_topic(self) -> bool:
time.sleep(5)
return False

def get_schema_id(self, new_schema: TypedSchema) -> int:
def get_schema_id(self, new_schema):
with self.id_lock:
schemas = self.schemas.items()
for schema_id, schema in schemas:
Expand All @@ -203,11 +203,11 @@ def get_schema_id(self, new_schema: TypedSchema) -> int:
self.global_schema_id += 1
return self.global_schema_id

def close(self) -> None:
def close(self):
self.log.info("Closing schema_reader")
self.running = False

def run(self) -> None:
def run(self):
while self.running:
try:
if not self.admin_client:
Expand All @@ -233,7 +233,7 @@ def run(self) -> None:
self.stats.unexpected_exception(ex=e, where="schema_reader_exit")
self.log.exception("Unexpected exception closing schema reader")

def handle_messages(self) -> None:
def handle_messages(self):
raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms)
if self.ready is False and raw_msgs == {}:
self.ready = True
Expand Down Expand Up @@ -270,8 +270,8 @@ def handle_messages(self) -> None:
if self.ready and add_offsets:
self.queue.put(self.offset)

def handle_msg(self, key: dict, value: Optional[dict]) -> None:
if key["keytype"] == "CONFIG" and value:
def handle_msg(self, key: dict, value: dict):
if key["keytype"] == "CONFIG":
if "subject" in key and key["subject"] is not None:
if not value:
self.log.info("Deleting compatibility config completely for subject: %r", key["subject"])
Expand Down Expand Up @@ -349,7 +349,7 @@ def handle_msg(self, key: dict, value: Optional[dict]) -> None:
self.schemas[value["id"]] = typed_schema
if value["id"] > self.global_schema_id: # Not an existing schema
self.global_schema_id = value["id"]
elif key["keytype"] == "DELETE_SUBJECT" and value:
elif key["keytype"] == "DELETE_SUBJECT":
self.log.info("Deleting subject: %r, value: %r", value["subject"], value)
if not value["subject"] in self.subjects:
self.log.error("Subject: %r did not exist, should have", value["subject"])
Expand Down

0 comments on commit dc751ce

Please sign in to comment.