diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 8e4aae959..3dd2245dd 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -18,8 +18,7 @@ from karapace.utils import json_encode, KarapaceKafkaClient from queue import Queue from threading import Lock, Thread -from typing import Dict, Optional -from weakref import WeakValueDictionary +from typing import Dict import json import logging @@ -105,6 +104,7 @@ def __init__(self, config, master_coordinator=None): self.timeout_ms = 200 self.config = config self.subjects = {} + self.schemas: Dict[int, TypedSchema] = {} self.global_schema_id = 0 self.offset = 0 self.admin_client = None @@ -120,12 +120,7 @@ def __init__(self, config, master_coordinator=None): sentry_config["tags"] = {} self.stats = StatsClient(sentry_config=sentry_config) - # A schema has the same `id` even if registered in two different subjects. This container - # has a weak reference to every schema in use, used to retrieve its `id`. Weak references - # are used to allow for free memory when a schema is cleared from all subjects. - self.schemas: Dict[int, TypedSchema] = WeakValueDictionary() - - def init_consumer(self) -> None: + def init_consumer(self): # Group not set on purpose, all consumers read the same data session_timeout_ms = self.config["session_timeout_ms"] request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) @@ -149,7 +144,7 @@ def init_consumer(self) -> None: metadata_max_age_ms=self.config["metadata_max_age_ms"], ) - def init_admin_client(self) -> bool: + def init_admin_client(self): try: self.admin_client = KafkaAdminClient( api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, @@ -173,7 +168,7 @@ def init_admin_client(self) -> bool: return False @staticmethod - def get_new_schema_topic(config: dict) -> NewTopic: + def get_new_schema_topic(config): return NewTopic( name=config["topic_name"], num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, @@ -181,7 +176,7 @@ def get_new_schema_topic(config: dict) -> NewTopic: topic_configs={"cleanup.policy": "compact"} ) - def create_schema_topic(self) -> bool: + def create_schema_topic(self): schema_topic = self.get_new_schema_topic(self.config) try: self.log.info("Creating topic: %r", schema_topic) @@ -198,19 +193,21 @@ def create_schema_topic(self) -> bool: time.sleep(5) return False - def get_schema_id(self, new_schema: TypedSchema) -> int: + def get_schema_id(self, new_schema): + with self.id_lock: + schemas = self.schemas.items() + for schema_id, schema in schemas: + if schema == new_schema: + return schema_id with self.id_lock: - for schema_id, schema in self.schemas.items(): - if schema == new_schema: - return schema_id self.global_schema_id += 1 return self.global_schema_id - def close(self) -> None: + def close(self): self.log.info("Closing schema_reader") self.running = False - def run(self) -> None: + def run(self): while self.running: try: if not self.admin_client: @@ -236,7 +233,7 @@ def run(self) -> None: self.stats.unexpected_exception(ex=e, where="schema_reader_exit") self.log.exception("Unexpected exception closing schema reader") - def handle_messages(self) -> None: + def handle_messages(self): raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) if self.ready is False and raw_msgs == {}: self.ready = True @@ -273,111 +270,104 @@ def handle_messages(self) -> None: if self.ready and add_offsets: self.queue.put(self.offset) - def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: - subject = key.get("subject") - if subject is not None: - if subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", subject) - self.subjects[subject] = {"schemas": {}} - - if not value: - self.log.info("Deleting compatibility config completely for subject: %r", subject) - self.subjects[subject].pop("compatibility", None) + def handle_msg(self, key: dict, value: dict): + if key["keytype"] == "CONFIG": + if "subject" in key and key["subject"] is not None: + if not value: + self.log.info("Deleting compatibility config completely for subject: %r", key["subject"]) + self.subjects[key["subject"]].pop("compatibility", None) + return + self.log.info( + "Setting subject: %r config to: %r, value: %r", key["subject"], value["compatibilityLevel"], value + ) + if not key["subject"] in self.subjects: + self.log.info("Adding first version of subject: %r with no schemas", key["subject"]) + self.subjects[key["subject"]] = {"schemas": {}} + subject_data = self.subjects.get(key["subject"]) + subject_data["compatibility"] = value["compatibilityLevel"] else: - self.log.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) - self.subjects[subject]["compatibility"] = value["compatibilityLevel"] - elif value is not None: - self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) - self.config["compatibility"] = value["compatibilityLevel"] - - def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: # pylint: disable=unused-argument - if value is None: - self.log.error("DELETE_SUBJECT record doesnt have a value, should have") - return - - subject = value["subject"] - if subject not in self.subjects: - self.log.error("Subject: %r did not exist, should have", subject) - else: - self.log.info("Deleting subject: %r, value: %r", subject, value) - version = value["version"] - for schema in self.subjects[subject]["schemas"].values(): - if schema["version"] <= version: - schema["deleted"] = True - - def _handle_msg_schema_hard_delete(self, key: dict) -> None: - subject, version = key["subject"], key["version"] - - if subject not in self.subjects: - self.log.error("Hard delete: Subject %s did not exist, should have", subject) - elif version not in self.subjects[subject]["schemas"]: - self.log.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) - else: - self.log.info("Hard delete: subject: %r version: %r", subject, version) - self.subjects[subject]["schemas"].pop(version, None) - - def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: - if not value: - self._handle_msg_schema_hard_delete(key) - return - - schema_type = value.get("schemaType", "AVRO") - schema_str = value["schema"] - schema_subject = value["subject"] - schema_id = value["id"] - schema_version = value["version"] - schema_deleted = value.get("deleted", False) - - # The TypedSchema object must be re-used, otherwise the refcount will be incorrect and the - # schema will be freed prematurely - typed_schema = self.schemas.get(schema_id) - - if typed_schema is None: + self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) + self.config["compatibility"] = value["compatibilityLevel"] + elif key["keytype"] == "SCHEMA": + if not value: + subject, version = key["subject"], key["version"] + self.log.info("Deleting subject: %r version: %r completely", subject, version) + if subject not in self.subjects: + self.log.error("Subject %s did not exist, should have", subject) + elif version not in self.subjects[subject]["schemas"]: + self.log.error("Version %d for subject %s did not exist, should have", version, subject) + else: + self.subjects[subject]["schemas"].pop(version, None) + return + schema_type = value.get("schemaType", "AVRO") + schema_str = value["schema"] try: typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str) except InvalidSchema: try: schema_json = json.loads(schema_str) typed_schema = TypedSchema( - schema_type=SchemaType(schema_type), - schema=schema_json, - schema_str=schema_str, + schema_type=SchemaType(schema_type), schema=schema_json, schema_str=schema_str ) except JSONDecodeError: - self.log.exception("Invalid schema: %s", schema_str) + self.log.error("Invalid json: %s", value["schema"]) return - - if schema_subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", schema_subject) - self.subjects[schema_subject] = {"schemas": {}} - - subjects_schemas = self.subjects[schema_subject]["schemas"] - - if schema_version in subjects_schemas: - self.log.info("Updating entry for subject: %r, value: %r", schema_subject, value) - else: - self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value) - - subjects_schemas[schema_version] = { - "schema": typed_schema, - "version": schema_version, - "id": schema_id, - "deleted": schema_deleted, - } - with self.id_lock: - self.schemas[schema_id] = typed_schema - self.global_schema_id = max(self.global_schema_id, schema_id) - - def handle_msg(self, key: dict, value: Optional[dict]) -> None: - if key["keytype"] == "CONFIG": - self._handle_msg_config(key, value) - elif key["keytype"] == "SCHEMA": - self._handle_msg_schema(key, value) + self.log.debug("Got typed schema %r", typed_schema) + subject = value["subject"] + if subject not in self.subjects: + self.log.info("Adding first version of subject: %r, value: %r", subject, value) + self.subjects[subject] = { + "schemas": { + value["version"]: { + "schema": typed_schema, + "version": value["version"], + "id": value["id"], + "deleted": value.get("deleted", False), + } + } + } + self.log.info("Setting schema_id: %r with schema: %r", value["id"], typed_schema) + self.schemas[value["id"]] = typed_schema + if value["id"] > self.global_schema_id: # Not an existing schema + self.global_schema_id = value["id"] + elif value.get("deleted", False) is True: + self.log.info("Deleting subject: %r, version: %r", subject, value["version"]) + if not value["version"] in self.subjects[subject]["schemas"]: + self.schemas[value["id"]] = typed_schema + else: + self.subjects[subject]["schemas"][value["version"]]["deleted"] = True + elif value.get("deleted", False) is False: + self.log.info("Adding new version of subject: %r, value: %r", subject, value) + self.subjects[subject]["schemas"][value["version"]] = { + "schema": typed_schema, + "version": value["version"], + "id": value["id"], + "deleted": value.get("deleted", False), + } + self.log.info("Setting schema_id: %r with schema: %r", value["id"], value["schema"]) + with self.id_lock: + self.schemas[value["id"]] = typed_schema + if value["id"] > self.global_schema_id: # Not an existing schema + self.global_schema_id = value["id"] elif key["keytype"] == "DELETE_SUBJECT": - self._handle_msg_delete_subject(key, value) + self.log.info("Deleting subject: %r, value: %r", value["subject"], value) + if not value["subject"] in self.subjects: + self.log.error("Subject: %r did not exist, should have", value["subject"]) + else: + updated_schemas = { + key: self._delete_schema_below_version(schema, value["version"]) + for key, schema in self.subjects[value["subject"]]["schemas"].items() + } + self.subjects[value["subject"]]["schemas"] = updated_schemas elif key["keytype"] == "NOOP": # for spec completeness pass + @staticmethod + def _delete_schema_below_version(schema, version): + if schema["version"] <= version: + schema["deleted"] = True + return schema + def get_schemas(self, subject, *, include_deleted=False): if include_deleted: return self.subjects[subject]["schemas"] diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 757f65568..70827ef38 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2374,9 +2374,9 @@ async def test_schema_hard_delete_whole_schema(registry_async_client: Client) -> assert res.json()["message"] == f"Subject '{subject}' not found." -async def test_schema_soft_delete_and_recreate(registry_async_client: Client) -> None: - subject = create_subject_name_factory("test_schema_soft_delete_and_recreate")() - schema_name = create_schema_name_factory("test_schema_soft_delete_and_recreate")() +async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> None: + subject = create_subject_name_factory("test_schema_hard_delete_and_recreate")() + schema_name = create_schema_name_factory("test_schema_hard_delete_and_recreate")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status == 200 @@ -2413,35 +2413,10 @@ async def test_schema_soft_delete_and_recreate(registry_async_client: Client) -> assert "id" in res.json() assert schema_id == res.json()["id"], "after soft delete the same schema registered, the same identifier" - -async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> None: - subject_factory = create_subject_name_factory("test_schema_hard_delete_and_recreate") - subject = subject_factory() - schema_name = create_schema_name_factory("test_schema_hard_delete_and_recreate")() - - res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) - assert res.status == 200 - schema = { - "type": "record", - "name": schema_name, - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - first_schema_id = res.json()["id"] - + # Soft delete whole schema res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 + # Hard delete whole schema res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") assert res.status_code == 200 @@ -2450,91 +2425,14 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." - # Recreate after hard delete on all subjects frees the schema, and a new id is used - res = await registry_async_client.post( - f"subjects/{subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - msg = "permanent deleted of the schema on all subjects causes a new identifier to be used" - second_schema_id = res.json()["id"] - assert first_schema_id != second_schema_id, msg - - # Register the same schema in another subject, this time the schema should not be freed - subject_keepalive = subject_factory() - res = await registry_async_client.post( - f"subjects/{subject_keepalive}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert second_schema_id == res.json()["id"] - - res = await registry_async_client.delete(f"subjects/{subject}") - assert res.status_code == 200 - - res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") - assert res.status_code == 200 - + # Recreate with same subject after hard delete res = await registry_async_client.post( f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}, ) assert res.status == 200 - msg = "the identifier does not change when the schema is permanent deleted in only one of the subjects" - assert second_schema_id == res.json()["id"], msg - - -async def test_regression_schema_hard_delete_order_must_not_matter(registry_async_client: Client) -> None: - """Regression: A hard delete on the last registered subject would free the schema. - - The correct behavior is to only free the schema after a hard delete on *all* subjects. - """ - subject_factory = create_subject_name_factory("test_schema_hard_delete_regression") - first_subject = subject_factory() - second_subject = subject_factory() - schema_name = create_schema_name_factory("test_schema_hard_delete_regression")() - - res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) - assert res.status == 200 - schema = { - "type": "record", - "name": schema_name, - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{first_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - schema_id = res.json()["id"] - - res = await registry_async_client.post( - f"subjects/{second_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert schema_id == res.json()["id"] - - # Regression: The hard delete is performed on the last subject the schema was registered - res = await registry_async_client.delete(f"subjects/{second_subject}") - assert res.status_code == 200 - res = await registry_async_client.delete(f"subjects/{second_subject}?permanent=true") - assert res.status_code == 200 - - res = await registry_async_client.post( - f"subjects/{second_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - msg = "the identifier does not change when the schema is permanent deleted in only one of the subjects" - assert schema_id == res.json()["id"], msg + assert "id" in res.json() + assert schema_id == res.json()["id"], "after permanent deleted the same schema registered, the same identifier" async def test_invalid_schema_should_provide_good_error_messages(registry_async_client: Client) -> None: