Skip to content

Commit

Permalink
Merge pull request #295 from aiven/revert-release-global-ids
Browse files Browse the repository at this point in the history
Revert release global ids
  • Loading branch information
ivanyu authored Dec 16, 2021
2 parents 8b235f0 + dc751ce commit 432733f
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 219 deletions.
208 changes: 99 additions & 109 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
from karapace.utils import json_encode, KarapaceKafkaClient
from queue import Queue
from threading import Lock, Thread
from typing import Dict, Optional
from weakref import WeakValueDictionary
from typing import Dict

import json
import logging
Expand Down Expand Up @@ -105,6 +104,7 @@ def __init__(self, config, master_coordinator=None):
self.timeout_ms = 200
self.config = config
self.subjects = {}
self.schemas: Dict[int, TypedSchema] = {}
self.global_schema_id = 0
self.offset = 0
self.admin_client = None
Expand All @@ -120,12 +120,7 @@ def __init__(self, config, master_coordinator=None):
sentry_config["tags"] = {}
self.stats = StatsClient(sentry_config=sentry_config)

# A schema has the same `id` even if registered in two different subjects. This container
# has a weak reference to every schema in use, used to retrieve its `id`. Weak references
# are used to allow for free memory when a schema is cleared from all subjects.
self.schemas: Dict[int, TypedSchema] = WeakValueDictionary()

def init_consumer(self) -> None:
def init_consumer(self):
# Group not set on purpose, all consumers read the same data
session_timeout_ms = self.config["session_timeout_ms"]
request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"])
Expand All @@ -149,7 +144,7 @@ def init_consumer(self) -> None:
metadata_max_age_ms=self.config["metadata_max_age_ms"],
)

def init_admin_client(self) -> bool:
def init_admin_client(self):
try:
self.admin_client = KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
Expand All @@ -173,15 +168,15 @@ def init_admin_client(self) -> bool:
return False

@staticmethod
def get_new_schema_topic(config: dict) -> NewTopic:
def get_new_schema_topic(config):
return NewTopic(
name=config["topic_name"],
num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS,
replication_factor=config["replication_factor"],
topic_configs={"cleanup.policy": "compact"}
)

def create_schema_topic(self) -> bool:
def create_schema_topic(self):
schema_topic = self.get_new_schema_topic(self.config)
try:
self.log.info("Creating topic: %r", schema_topic)
Expand All @@ -198,19 +193,21 @@ def create_schema_topic(self) -> bool:
time.sleep(5)
return False

def get_schema_id(self, new_schema: TypedSchema) -> int:
def get_schema_id(self, new_schema):
with self.id_lock:
schemas = self.schemas.items()
for schema_id, schema in schemas:
if schema == new_schema:
return schema_id
with self.id_lock:
for schema_id, schema in self.schemas.items():
if schema == new_schema:
return schema_id
self.global_schema_id += 1
return self.global_schema_id

def close(self) -> None:
def close(self):
self.log.info("Closing schema_reader")
self.running = False

def run(self) -> None:
def run(self):
while self.running:
try:
if not self.admin_client:
Expand All @@ -236,7 +233,7 @@ def run(self) -> None:
self.stats.unexpected_exception(ex=e, where="schema_reader_exit")
self.log.exception("Unexpected exception closing schema reader")

def handle_messages(self) -> None:
def handle_messages(self):
raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms)
if self.ready is False and raw_msgs == {}:
self.ready = True
Expand Down Expand Up @@ -273,111 +270,104 @@ def handle_messages(self) -> None:
if self.ready and add_offsets:
self.queue.put(self.offset)

def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None:
subject = key.get("subject")
if subject is not None:
if subject not in self.subjects:
self.log.info("Adding first version of subject: %r with no schemas", subject)
self.subjects[subject] = {"schemas": {}}

if not value:
self.log.info("Deleting compatibility config completely for subject: %r", subject)
self.subjects[subject].pop("compatibility", None)
def handle_msg(self, key: dict, value: dict):
if key["keytype"] == "CONFIG":
if "subject" in key and key["subject"] is not None:
if not value:
self.log.info("Deleting compatibility config completely for subject: %r", key["subject"])
self.subjects[key["subject"]].pop("compatibility", None)
return
self.log.info(
"Setting subject: %r config to: %r, value: %r", key["subject"], value["compatibilityLevel"], value
)
if not key["subject"] in self.subjects:
self.log.info("Adding first version of subject: %r with no schemas", key["subject"])
self.subjects[key["subject"]] = {"schemas": {}}
subject_data = self.subjects.get(key["subject"])
subject_data["compatibility"] = value["compatibilityLevel"]
else:
self.log.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value)
self.subjects[subject]["compatibility"] = value["compatibilityLevel"]
elif value is not None:
self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value)
self.config["compatibility"] = value["compatibilityLevel"]

def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: # pylint: disable=unused-argument
if value is None:
self.log.error("DELETE_SUBJECT record doesnt have a value, should have")
return

subject = value["subject"]
if subject not in self.subjects:
self.log.error("Subject: %r did not exist, should have", subject)
else:
self.log.info("Deleting subject: %r, value: %r", subject, value)
version = value["version"]
for schema in self.subjects[subject]["schemas"].values():
if schema["version"] <= version:
schema["deleted"] = True

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

if subject not in self.subjects:
self.log.error("Hard delete: Subject %s did not exist, should have", subject)
elif version not in self.subjects[subject]["schemas"]:
self.log.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject)
else:
self.log.info("Hard delete: subject: %r version: %r", subject, version)
self.subjects[subject]["schemas"].pop(version, None)

def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None:
if not value:
self._handle_msg_schema_hard_delete(key)
return

schema_type = value.get("schemaType", "AVRO")
schema_str = value["schema"]
schema_subject = value["subject"]
schema_id = value["id"]
schema_version = value["version"]
schema_deleted = value.get("deleted", False)

# The TypedSchema object must be re-used, otherwise the refcount will be incorrect and the
# schema will be freed prematurely
typed_schema = self.schemas.get(schema_id)

if typed_schema is None:
self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value)
self.config["compatibility"] = value["compatibilityLevel"]
elif key["keytype"] == "SCHEMA":
if not value:
subject, version = key["subject"], key["version"]
self.log.info("Deleting subject: %r version: %r completely", subject, version)
if subject not in self.subjects:
self.log.error("Subject %s did not exist, should have", subject)
elif version not in self.subjects[subject]["schemas"]:
self.log.error("Version %d for subject %s did not exist, should have", version, subject)
else:
self.subjects[subject]["schemas"].pop(version, None)
return
schema_type = value.get("schemaType", "AVRO")
schema_str = value["schema"]
try:
typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str)
except InvalidSchema:
try:
schema_json = json.loads(schema_str)
typed_schema = TypedSchema(
schema_type=SchemaType(schema_type),
schema=schema_json,
schema_str=schema_str,
schema_type=SchemaType(schema_type), schema=schema_json, schema_str=schema_str
)
except JSONDecodeError:
self.log.exception("Invalid schema: %s", schema_str)
self.log.error("Invalid json: %s", value["schema"])
return

if schema_subject not in self.subjects:
self.log.info("Adding first version of subject: %r with no schemas", schema_subject)
self.subjects[schema_subject] = {"schemas": {}}

subjects_schemas = self.subjects[schema_subject]["schemas"]

if schema_version in subjects_schemas:
self.log.info("Updating entry for subject: %r, value: %r", schema_subject, value)
else:
self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value)

subjects_schemas[schema_version] = {
"schema": typed_schema,
"version": schema_version,
"id": schema_id,
"deleted": schema_deleted,
}
with self.id_lock:
self.schemas[schema_id] = typed_schema
self.global_schema_id = max(self.global_schema_id, schema_id)

def handle_msg(self, key: dict, value: Optional[dict]) -> None:
if key["keytype"] == "CONFIG":
self._handle_msg_config(key, value)
elif key["keytype"] == "SCHEMA":
self._handle_msg_schema(key, value)
self.log.debug("Got typed schema %r", typed_schema)
subject = value["subject"]
if subject not in self.subjects:
self.log.info("Adding first version of subject: %r, value: %r", subject, value)
self.subjects[subject] = {
"schemas": {
value["version"]: {
"schema": typed_schema,
"version": value["version"],
"id": value["id"],
"deleted": value.get("deleted", False),
}
}
}
self.log.info("Setting schema_id: %r with schema: %r", value["id"], typed_schema)
self.schemas[value["id"]] = typed_schema
if value["id"] > self.global_schema_id: # Not an existing schema
self.global_schema_id = value["id"]
elif value.get("deleted", False) is True:
self.log.info("Deleting subject: %r, version: %r", subject, value["version"])
if not value["version"] in self.subjects[subject]["schemas"]:
self.schemas[value["id"]] = typed_schema
else:
self.subjects[subject]["schemas"][value["version"]]["deleted"] = True
elif value.get("deleted", False) is False:
self.log.info("Adding new version of subject: %r, value: %r", subject, value)
self.subjects[subject]["schemas"][value["version"]] = {
"schema": typed_schema,
"version": value["version"],
"id": value["id"],
"deleted": value.get("deleted", False),
}
self.log.info("Setting schema_id: %r with schema: %r", value["id"], value["schema"])
with self.id_lock:
self.schemas[value["id"]] = typed_schema
if value["id"] > self.global_schema_id: # Not an existing schema
self.global_schema_id = value["id"]
elif key["keytype"] == "DELETE_SUBJECT":
self._handle_msg_delete_subject(key, value)
self.log.info("Deleting subject: %r, value: %r", value["subject"], value)
if not value["subject"] in self.subjects:
self.log.error("Subject: %r did not exist, should have", value["subject"])
else:
updated_schemas = {
key: self._delete_schema_below_version(schema, value["version"])
for key, schema in self.subjects[value["subject"]]["schemas"].items()
}
self.subjects[value["subject"]]["schemas"] = updated_schemas
elif key["keytype"] == "NOOP": # for spec completeness
pass

@staticmethod
def _delete_schema_below_version(schema, version):
if schema["version"] <= version:
schema["deleted"] = True
return schema

def get_schemas(self, subject, *, include_deleted=False):
if include_deleted:
return self.subjects[subject]["schemas"]
Expand Down
Loading

0 comments on commit 432733f

Please sign in to comment.