From ec6ea455934250e8c2c41d7275f5dbeda4f6ebd7 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:44 -0300 Subject: [PATCH 01/10] Revert "schema_reader: fix refcount issue" This reverts commit 45cfb7085f3708af9d8000d5093a257175bd8105. --- karapace/schema_reader.py | 26 +++++----------- tests/integration/test_schema.py | 53 -------------------------------- 2 files changed, 8 insertions(+), 71 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 8e4aae959..4dc99637e 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -327,25 +327,15 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: schema_id = value["id"] schema_version = value["version"] schema_deleted = value.get("deleted", False) - - # The TypedSchema object must be re-used, otherwise the refcount will be incorrect and the - # schema will be freed prematurely - typed_schema = self.schemas.get(schema_id) - - if typed_schema is None: + try: + typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str) + except InvalidSchema: try: - typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str) - except InvalidSchema: - try: - schema_json = json.loads(schema_str) - typed_schema = TypedSchema( - schema_type=SchemaType(schema_type), - schema=schema_json, - schema_str=schema_str, - ) - except JSONDecodeError: - self.log.exception("Invalid schema: %s", schema_str) - return + schema_json = json.loads(schema_str) + typed_schema = TypedSchema(schema_type=SchemaType(schema_type), schema=schema_json, schema_str=schema_str) + except JSONDecodeError: + self.log.error("Invalid json: %s", schema_str) + return if schema_subject not in self.subjects: self.log.info("Adding first version of subject: %r with no schemas", schema_subject) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 757f65568..373f1b308 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2484,59 +2484,6 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> assert second_schema_id == res.json()["id"], msg -async def test_regression_schema_hard_delete_order_must_not_matter(registry_async_client: Client) -> None: - """Regression: A hard delete on the last registered subject would free the schema. - - The correct behavior is to only free the schema after a hard delete on *all* subjects. - """ - subject_factory = create_subject_name_factory("test_schema_hard_delete_regression") - first_subject = subject_factory() - second_subject = subject_factory() - schema_name = create_schema_name_factory("test_schema_hard_delete_regression")() - - res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) - assert res.status == 200 - schema = { - "type": "record", - "name": schema_name, - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{first_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - schema_id = res.json()["id"] - - res = await registry_async_client.post( - f"subjects/{second_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert schema_id == res.json()["id"] - - # Regression: The hard delete is performed on the last subject the schema was registered - res = await registry_async_client.delete(f"subjects/{second_subject}") - assert res.status_code == 200 - res = await registry_async_client.delete(f"subjects/{second_subject}?permanent=true") - assert res.status_code == 200 - - res = await registry_async_client.post( - f"subjects/{second_subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - msg = "the identifier does not change when the schema is permanent deleted in only one of the subjects" - assert schema_id == res.json()["id"], msg - - async def test_invalid_schema_should_provide_good_error_messages(registry_async_client: Client) -> None: """The user should receive an informative error message when the format is invalid""" subject_name_factory = create_subject_name_factory("test_schema_subject_post_invalid_data") From 3f8a5271951e1e3aba809476776c845a350e5a21 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:45 -0300 Subject: [PATCH 02/10] Revert "schema_reader: simplified subject delete" This reverts commit 411a3e6163c74f782efbb49e9c6742fd66ed2574. --- karapace/schema_reader.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 4dc99637e..633cace06 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -300,10 +300,11 @@ def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: self.log.error("Subject: %r did not exist, should have", subject) else: self.log.info("Deleting subject: %r, value: %r", subject, value) - version = value["version"] - for schema in self.subjects[subject]["schemas"].values(): - if schema["version"] <= version: - schema["deleted"] = True + updated_schemas = { + key: self._delete_schema_below_version(schema, value["version"]) + for key, schema in self.subjects[subject]["schemas"].items() + } + self.subjects[value["subject"]]["schemas"] = updated_schemas def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] @@ -368,6 +369,12 @@ def handle_msg(self, key: dict, value: Optional[dict]) -> None: elif key["keytype"] == "NOOP": # for spec completeness pass + @staticmethod + def _delete_schema_below_version(schema, version): + if schema["version"] <= version: + schema["deleted"] = True + return schema + def get_schemas(self, subject, *, include_deleted=False): if include_deleted: return self.subjects[subject]["schemas"] From 551ece76997bb9e56d3864deb1f8b5f080296e78 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:46 -0300 Subject: [PATCH 03/10] Revert "schema_reader: fix memory leak" This reverts commit d19c44ecd64b0db00dff4cda7fd106c063539b5a. --- karapace/schema_reader.py | 7 +--- tests/integration/test_schema.py | 65 ++++---------------------------- 2 files changed, 9 insertions(+), 63 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 633cace06..3eea261a6 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -19,7 +19,6 @@ from queue import Queue from threading import Lock, Thread from typing import Dict, Optional -from weakref import WeakValueDictionary import json import logging @@ -105,6 +104,7 @@ def __init__(self, config, master_coordinator=None): self.timeout_ms = 200 self.config = config self.subjects = {} + self.schemas: Dict[int, TypedSchema] = {} self.global_schema_id = 0 self.offset = 0 self.admin_client = None @@ -120,11 +120,6 @@ def __init__(self, config, master_coordinator=None): sentry_config["tags"] = {} self.stats = StatsClient(sentry_config=sentry_config) - # A schema has the same `id` even if registered in two different subjects. This container - # has a weak reference to every schema in use, used to retrieve its `id`. Weak references - # are used to allow for free memory when a schema is cleared from all subjects. - self.schemas: Dict[int, TypedSchema] = WeakValueDictionary() - def init_consumer(self) -> None: # Group not set on purpose, all consumers read the same data session_timeout_ms = self.config["session_timeout_ms"] diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 373f1b308..70827ef38 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2374,9 +2374,9 @@ async def test_schema_hard_delete_whole_schema(registry_async_client: Client) -> assert res.json()["message"] == f"Subject '{subject}' not found." -async def test_schema_soft_delete_and_recreate(registry_async_client: Client) -> None: - subject = create_subject_name_factory("test_schema_soft_delete_and_recreate")() - schema_name = create_schema_name_factory("test_schema_soft_delete_and_recreate")() +async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> None: + subject = create_subject_name_factory("test_schema_hard_delete_and_recreate")() + schema_name = create_schema_name_factory("test_schema_hard_delete_and_recreate")() res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) assert res.status == 200 @@ -2413,35 +2413,10 @@ async def test_schema_soft_delete_and_recreate(registry_async_client: Client) -> assert "id" in res.json() assert schema_id == res.json()["id"], "after soft delete the same schema registered, the same identifier" - -async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> None: - subject_factory = create_subject_name_factory("test_schema_hard_delete_and_recreate") - subject = subject_factory() - schema_name = create_schema_name_factory("test_schema_hard_delete_and_recreate")() - - res = await registry_async_client.put("config", json={"compatibility": "BACKWARD"}) - assert res.status == 200 - schema = { - "type": "record", - "name": schema_name, - "fields": [{ - "type": { - "type": "enum", - "name": "enumtest", - "symbols": ["first", "second"], - }, - "name": "faa", - }] - } - res = await registry_async_client.post( - f"subjects/{subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - first_schema_id = res.json()["id"] - + # Soft delete whole schema res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 + # Hard delete whole schema res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") assert res.status_code == 200 @@ -2450,38 +2425,14 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> assert res.json()["error_code"] == 40401 assert res.json()["message"] == f"Subject '{subject}' not found." - # Recreate after hard delete on all subjects frees the schema, and a new id is used + # Recreate with same subject after hard delete res = await registry_async_client.post( f"subjects/{subject}/versions", json={"schema": jsonlib.dumps(schema)}, ) assert res.status == 200 - msg = "permanent deleted of the schema on all subjects causes a new identifier to be used" - second_schema_id = res.json()["id"] - assert first_schema_id != second_schema_id, msg - - # Register the same schema in another subject, this time the schema should not be freed - subject_keepalive = subject_factory() - res = await registry_async_client.post( - f"subjects/{subject_keepalive}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - assert second_schema_id == res.json()["id"] - - res = await registry_async_client.delete(f"subjects/{subject}") - assert res.status_code == 200 - - res = await registry_async_client.delete(f"subjects/{subject}?permanent=true") - assert res.status_code == 200 - - res = await registry_async_client.post( - f"subjects/{subject}/versions", - json={"schema": jsonlib.dumps(schema)}, - ) - assert res.status == 200 - msg = "the identifier does not change when the schema is permanent deleted in only one of the subjects" - assert second_schema_id == res.json()["id"], msg + assert "id" in res.json() + assert schema_id == res.json()["id"], "after permanent deleted the same schema registered, the same identifier" async def test_invalid_schema_should_provide_good_error_messages(registry_async_client: Client) -> None: From 345161a377336dc7025ae9be51a72151f367aecb Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:47 -0300 Subject: [PATCH 04/10] Revert "schema_reader: simplified message handling" This reverts commit 7474adfec40aec365cc76f328ec2aaee2dc52f5f. --- karapace/schema_reader.py | 54 +++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 3eea261a6..cd1281f40 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -332,27 +332,43 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: except JSONDecodeError: self.log.error("Invalid json: %s", schema_str) return - + self.log.debug("Got typed schema %r", typed_schema) if schema_subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", schema_subject) - self.subjects[schema_subject] = {"schemas": {}} - - subjects_schemas = self.subjects[schema_subject]["schemas"] - - if schema_version in subjects_schemas: - self.log.info("Updating entry for subject: %r, value: %r", schema_subject, value) - else: + self.log.info("Adding first version of subject: %r, value: %r", schema_subject, value) + self.subjects[schema_subject] = { + "schemas": { + schema_version: { + "schema": typed_schema, + "version": schema_version, + "id": schema_id, + "deleted": schema_deleted, + } + } + } + self.log.info("Setting schema_id: %r with schema: %r", schema_id, typed_schema) + with self.id_lock: + self.schemas[schema_id] = typed_schema + self.global_schema_id = max(self.global_schema_id, schema_id) + elif schema_deleted is True: + self.log.info("Soft delete: subject: %r, version: %r", schema_subject, schema_version) + if schema_version not in self.subjects[schema_subject]["schemas"]: + with self.id_lock: + self.schemas[schema_id] = typed_schema + self.global_schema_id = max(self.global_schema_id, schema_id) + else: + self.subjects[schema_subject]["schemas"][schema_version]["deleted"] = True + elif schema_deleted is False: self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value) - - subjects_schemas[schema_version] = { - "schema": typed_schema, - "version": schema_version, - "id": schema_id, - "deleted": schema_deleted, - } - with self.id_lock: - self.schemas[schema_id] = typed_schema - self.global_schema_id = max(self.global_schema_id, schema_id) + self.subjects[schema_subject]["schemas"][schema_version] = { + "schema": typed_schema, + "version": schema_version, + "id": schema_id, + "deleted": schema_deleted, + } + self.log.info("Setting schema_id: %r with schema: %r", schema_id, schema_str) + with self.id_lock: + self.schemas[schema_id] = typed_schema + self.global_schema_id = max(self.global_schema_id, schema_id) def handle_msg(self, key: dict, value: Optional[dict]) -> None: if key["keytype"] == "CONFIG": From e2bee6f392ce4cf967a9cf870d029f58504eb901 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:48 -0300 Subject: [PATCH 05/10] Revert "schema_reader: separated logic for schema hard delete" This reverts commit aa29f10a90b237af136fa6c3c91e6500f9ae9f62. --- karapace/schema_reader.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index cd1281f40..dde56db5c 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -301,22 +301,17 @@ def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: } self.subjects[value["subject"]]["schemas"] = updated_schemas - def _handle_msg_schema_hard_delete(self, key: dict) -> None: - subject, version = key["subject"], key["version"] - - if subject not in self.subjects: - self.log.error("Hard delete: Subject %s did not exist, should have", subject) - elif version not in self.subjects[subject]["schemas"]: - self.log.error("Hard delete: Version %d for subject %s did not exist, should have", version, subject) - else: - self.log.info("Hard delete: subject: %r version: %r", subject, version) - self.subjects[subject]["schemas"].pop(version, None) - def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: if not value: - self._handle_msg_schema_hard_delete(key) + subject, version = key["subject"], key["version"] + self.log.info("Deleting subject: %r version: %r completely", subject, version) + if subject not in self.subjects: + self.log.error("Subject %s did not exist, should have", subject) + elif version not in self.subjects[subject]["schemas"]: + self.log.error("Version %d for subject %s did not exist, should have", version, subject) + else: + self.subjects[subject]["schemas"].pop(version, None) return - schema_type = value.get("schemaType", "AVRO") schema_str = value["schema"] schema_subject = value["subject"] @@ -350,7 +345,7 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: self.schemas[schema_id] = typed_schema self.global_schema_id = max(self.global_schema_id, schema_id) elif schema_deleted is True: - self.log.info("Soft delete: subject: %r, version: %r", schema_subject, schema_version) + self.log.info("Deleting subject: %r, version: %r", schema_subject, schema_version) if schema_version not in self.subjects[schema_subject]["schemas"]: with self.id_lock: self.schemas[schema_id] = typed_schema From 7e8824a6b71197c8a870f74e3bcc5aacb8e838d4 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:49 -0300 Subject: [PATCH 06/10] Revert "schema_reader: holding lock while manipulating schemas" This reverts commit d757cebf5b95e82dc1bd43010b1d2d6ffd4da12a. --- karapace/schema_reader.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index dde56db5c..63861be9a 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -195,9 +195,11 @@ def create_schema_topic(self) -> bool: def get_schema_id(self, new_schema: TypedSchema) -> int: with self.id_lock: - for schema_id, schema in self.schemas.items(): - if schema == new_schema: - return schema_id + schemas = self.schemas.items() + for schema_id, schema in schemas: + if schema == new_schema: + return schema_id + with self.id_lock: self.global_schema_id += 1 return self.global_schema_id @@ -341,15 +343,13 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: } } self.log.info("Setting schema_id: %r with schema: %r", schema_id, typed_schema) - with self.id_lock: - self.schemas[schema_id] = typed_schema - self.global_schema_id = max(self.global_schema_id, schema_id) + self.schemas[schema_id] = typed_schema + if schema_id > self.global_schema_id: # Not an existing schema + self.global_schema_id = schema_id elif schema_deleted is True: self.log.info("Deleting subject: %r, version: %r", schema_subject, schema_version) if schema_version not in self.subjects[schema_subject]["schemas"]: - with self.id_lock: - self.schemas[schema_id] = typed_schema - self.global_schema_id = max(self.global_schema_id, schema_id) + self.schemas[schema_id] = typed_schema else: self.subjects[schema_subject]["schemas"][schema_version]["deleted"] = True elif schema_deleted is False: @@ -363,7 +363,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: self.log.info("Setting schema_id: %r with schema: %r", schema_id, schema_str) with self.id_lock: self.schemas[schema_id] = typed_schema - self.global_schema_id = max(self.global_schema_id, schema_id) + if schema_id > self.global_schema_id: # Not an existing schema + self.global_schema_id = schema_id def handle_msg(self, key: dict, value: Optional[dict]) -> None: if key["keytype"] == "CONFIG": From c213542369eaaa5f8e6b5568c93cfc99a40b6839 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:50 -0300 Subject: [PATCH 07/10] Revert "schema_reader: isolated SUBJECT message handler" This reverts commit 78d6e1401b139061f03ec3f8625d7535c318cebb. --- karapace/schema_reader.py | 124 ++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 64 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 63861be9a..0f6e55e50 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -303,74 +303,70 @@ def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: } self.subjects[value["subject"]]["schemas"] = updated_schemas - def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: - if not value: - subject, version = key["subject"], key["version"] - self.log.info("Deleting subject: %r version: %r completely", subject, version) - if subject not in self.subjects: - self.log.error("Subject %s did not exist, should have", subject) - elif version not in self.subjects[subject]["schemas"]: - self.log.error("Version %d for subject %s did not exist, should have", version, subject) - else: - self.subjects[subject]["schemas"].pop(version, None) - return - schema_type = value.get("schemaType", "AVRO") - schema_str = value["schema"] - schema_subject = value["subject"] - schema_id = value["id"] - schema_version = value["version"] - schema_deleted = value.get("deleted", False) - try: - typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str) - except InvalidSchema: - try: - schema_json = json.loads(schema_str) - typed_schema = TypedSchema(schema_type=SchemaType(schema_type), schema=schema_json, schema_str=schema_str) - except JSONDecodeError: - self.log.error("Invalid json: %s", schema_str) - return - self.log.debug("Got typed schema %r", typed_schema) - if schema_subject not in self.subjects: - self.log.info("Adding first version of subject: %r, value: %r", schema_subject, value) - self.subjects[schema_subject] = { - "schemas": { - schema_version: { - "schema": typed_schema, - "version": schema_version, - "id": schema_id, - "deleted": schema_deleted, - } - } - } - self.log.info("Setting schema_id: %r with schema: %r", schema_id, typed_schema) - self.schemas[schema_id] = typed_schema - if schema_id > self.global_schema_id: # Not an existing schema - self.global_schema_id = schema_id - elif schema_deleted is True: - self.log.info("Deleting subject: %r, version: %r", schema_subject, schema_version) - if schema_version not in self.subjects[schema_subject]["schemas"]: - self.schemas[schema_id] = typed_schema - else: - self.subjects[schema_subject]["schemas"][schema_version]["deleted"] = True - elif schema_deleted is False: - self.log.info("Adding new version of subject: %r, value: %r", schema_subject, value) - self.subjects[schema_subject]["schemas"][schema_version] = { - "schema": typed_schema, - "version": schema_version, - "id": schema_id, - "deleted": schema_deleted, - } - self.log.info("Setting schema_id: %r with schema: %r", schema_id, schema_str) - with self.id_lock: - self.schemas[schema_id] = typed_schema - if schema_id > self.global_schema_id: # Not an existing schema - self.global_schema_id = schema_id - def handle_msg(self, key: dict, value: Optional[dict]) -> None: if key["keytype"] == "CONFIG": self._handle_msg_config(key, value) elif key["keytype"] == "SCHEMA": - self._handle_msg_schema(key, value) + if not value: + subject, version = key["subject"], key["version"] + self.log.info("Deleting subject: %r version: %r completely", subject, version) + if subject not in self.subjects: + self.log.error("Subject %s did not exist, should have", subject) + elif version not in self.subjects[subject]["schemas"]: + self.log.error("Version %d for subject %s did not exist, should have", version, subject) + else: + self.subjects[subject]["schemas"].pop(version, None) + return + schema_type = value.get("schemaType", "AVRO") + schema_str = value["schema"] + try: + typed_schema = TypedSchema.parse(schema_type=SchemaType(schema_type), schema_str=schema_str) + except InvalidSchema: + try: + schema_json = json.loads(schema_str) + typed_schema = TypedSchema( + schema_type=SchemaType(schema_type), schema=schema_json, schema_str=schema_str + ) + except JSONDecodeError: + self.log.error("Invalid json: %s", value["schema"]) + return + self.log.debug("Got typed schema %r", typed_schema) + subject = value["subject"] + if subject not in self.subjects: + self.log.info("Adding first version of subject: %r, value: %r", subject, value) + self.subjects[subject] = { + "schemas": { + value["version"]: { + "schema": typed_schema, + "version": value["version"], + "id": value["id"], + "deleted": value.get("deleted", False), + } + } + } + self.log.info("Setting schema_id: %r with schema: %r", value["id"], typed_schema) + self.schemas[value["id"]] = typed_schema + if value["id"] > self.global_schema_id: # Not an existing schema + self.global_schema_id = value["id"] + elif value.get("deleted", False) is True: + self.log.info("Deleting subject: %r, version: %r", subject, value["version"]) + if not value["version"] in self.subjects[subject]["schemas"]: + self.schemas[value["id"]] = typed_schema + else: + self.subjects[subject]["schemas"][value["version"]]["deleted"] = True + elif value.get("deleted", False) is False: + self.log.info("Adding new version of subject: %r, value: %r", subject, value) + self.subjects[subject]["schemas"][value["version"]] = { + "schema": typed_schema, + "version": value["version"], + "id": value["id"], + "deleted": value.get("deleted", False), + } + self.log.info("Setting schema_id: %r with schema: %r", value["id"], value["schema"]) + with self.id_lock: + self.schemas[value["id"]] = typed_schema + if value["id"] > self.global_schema_id: # Not an existing schema + self.global_schema_id = value["id"] elif key["keytype"] == "DELETE_SUBJECT": self._handle_msg_delete_subject(key, value) elif key["keytype"] == "NOOP": # for spec completeness From e3aa68ab826bd2926854485fad22c2a87d4afbcb Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:51 -0300 Subject: [PATCH 08/10] Revert "schema_reader: isolated DELETE_SUBJECT message handler" This reverts commit 87119d42fe6b77ad58af534a9ad9f71c730cadec. --- karapace/schema_reader.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 0f6e55e50..b4db1349c 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -287,22 +287,6 @@ def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) self.config["compatibility"] = value["compatibilityLevel"] - def _handle_msg_delete_subject(self, key: dict, value: Optional[dict]) -> None: # pylint: disable=unused-argument - if value is None: - self.log.error("DELETE_SUBJECT record doesnt have a value, should have") - return - - subject = value["subject"] - if subject not in self.subjects: - self.log.error("Subject: %r did not exist, should have", subject) - else: - self.log.info("Deleting subject: %r, value: %r", subject, value) - updated_schemas = { - key: self._delete_schema_below_version(schema, value["version"]) - for key, schema in self.subjects[subject]["schemas"].items() - } - self.subjects[value["subject"]]["schemas"] = updated_schemas - def handle_msg(self, key: dict, value: Optional[dict]) -> None: if key["keytype"] == "CONFIG": self._handle_msg_config(key, value) @@ -367,8 +351,16 @@ def handle_msg(self, key: dict, value: Optional[dict]) -> None: self.schemas[value["id"]] = typed_schema if value["id"] > self.global_schema_id: # Not an existing schema self.global_schema_id = value["id"] - elif key["keytype"] == "DELETE_SUBJECT": - self._handle_msg_delete_subject(key, value) + elif key["keytype"] == "DELETE_SUBJECT" and value: + self.log.info("Deleting subject: %r, value: %r", value["subject"], value) + if not value["subject"] in self.subjects: + self.log.error("Subject: %r did not exist, should have", value["subject"]) + else: + updated_schemas = { + key: self._delete_schema_below_version(schema, value["version"]) + for key, schema in self.subjects[value["subject"]]["schemas"].items() + } + self.subjects[value["subject"]]["schemas"] = updated_schemas elif key["keytype"] == "NOOP": # for spec completeness pass From 423d6c87436f1cb691f1f7654f9b521d12225456 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 14:21:52 -0300 Subject: [PATCH 09/10] Revert "schema_reader: isolated CONFIG message handler" This reverts commit 957df487ca5806cfcb514800ed017a70170aa3f0. --- karapace/schema_reader.py | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b4db1349c..5adcd6f08 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -270,26 +270,24 @@ def handle_messages(self) -> None: if self.ready and add_offsets: self.queue.put(self.offset) - def _handle_msg_config(self, key: dict, value: Optional[dict]) -> None: - subject = key.get("subject") - if subject is not None: - if subject not in self.subjects: - self.log.info("Adding first version of subject: %r with no schemas", subject) - self.subjects[subject] = {"schemas": {}} - - if not value: - self.log.info("Deleting compatibility config completely for subject: %r", subject) - self.subjects[subject].pop("compatibility", None) - else: - self.log.info("Setting subject: %r config to: %r, value: %r", subject, value["compatibilityLevel"], value) - self.subjects[subject]["compatibility"] = value["compatibilityLevel"] - elif value is not None: - self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) - self.config["compatibility"] = value["compatibilityLevel"] - def handle_msg(self, key: dict, value: Optional[dict]) -> None: - if key["keytype"] == "CONFIG": - self._handle_msg_config(key, value) + if key["keytype"] == "CONFIG" and value: + if "subject" in key and key["subject"] is not None: + if not value: + self.log.info("Deleting compatibility config completely for subject: %r", key["subject"]) + self.subjects[key["subject"]].pop("compatibility", None) + return + self.log.info( + "Setting subject: %r config to: %r, value: %r", key["subject"], value["compatibilityLevel"], value + ) + if not key["subject"] in self.subjects: + self.log.info("Adding first version of subject: %r with no schemas", key["subject"]) + self.subjects[key["subject"]] = {"schemas": {}} + subject_data = self.subjects.get(key["subject"]) + subject_data["compatibility"] = value["compatibilityLevel"] + else: + self.log.info("Setting global config to: %r, value: %r", value["compatibilityLevel"], value) + self.config["compatibility"] = value["compatibilityLevel"] elif key["keytype"] == "SCHEMA": if not value: subject, version = key["subject"], key["version"] From dc751ce1e05c92d8694a5f1cb954ed4008459013 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 16 Dec 2021 15:47:44 -0300 Subject: [PATCH 10/10] Revert "schema_reader: added typing" This reverts commit 3087b9a35ee312b171e6fd4ab5e14695b79cf618. --- karapace/schema_reader.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 5adcd6f08..3dd2245dd 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -18,7 +18,7 @@ from karapace.utils import json_encode, KarapaceKafkaClient from queue import Queue from threading import Lock, Thread -from typing import Dict, Optional +from typing import Dict import json import logging @@ -120,7 +120,7 @@ def __init__(self, config, master_coordinator=None): sentry_config["tags"] = {} self.stats = StatsClient(sentry_config=sentry_config) - def init_consumer(self) -> None: + def init_consumer(self): # Group not set on purpose, all consumers read the same data session_timeout_ms = self.config["session_timeout_ms"] request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) @@ -144,7 +144,7 @@ def init_consumer(self) -> None: metadata_max_age_ms=self.config["metadata_max_age_ms"], ) - def init_admin_client(self) -> bool: + def init_admin_client(self): try: self.admin_client = KafkaAdminClient( api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS, @@ -168,7 +168,7 @@ def init_admin_client(self) -> bool: return False @staticmethod - def get_new_schema_topic(config: dict) -> NewTopic: + def get_new_schema_topic(config): return NewTopic( name=config["topic_name"], num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, @@ -176,7 +176,7 @@ def get_new_schema_topic(config: dict) -> NewTopic: topic_configs={"cleanup.policy": "compact"} ) - def create_schema_topic(self) -> bool: + def create_schema_topic(self): schema_topic = self.get_new_schema_topic(self.config) try: self.log.info("Creating topic: %r", schema_topic) @@ -193,7 +193,7 @@ def create_schema_topic(self) -> bool: time.sleep(5) return False - def get_schema_id(self, new_schema: TypedSchema) -> int: + def get_schema_id(self, new_schema): with self.id_lock: schemas = self.schemas.items() for schema_id, schema in schemas: @@ -203,11 +203,11 @@ def get_schema_id(self, new_schema: TypedSchema) -> int: self.global_schema_id += 1 return self.global_schema_id - def close(self) -> None: + def close(self): self.log.info("Closing schema_reader") self.running = False - def run(self) -> None: + def run(self): while self.running: try: if not self.admin_client: @@ -233,7 +233,7 @@ def run(self) -> None: self.stats.unexpected_exception(ex=e, where="schema_reader_exit") self.log.exception("Unexpected exception closing schema reader") - def handle_messages(self) -> None: + def handle_messages(self): raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) if self.ready is False and raw_msgs == {}: self.ready = True @@ -270,8 +270,8 @@ def handle_messages(self) -> None: if self.ready and add_offsets: self.queue.put(self.offset) - def handle_msg(self, key: dict, value: Optional[dict]) -> None: - if key["keytype"] == "CONFIG" and value: + def handle_msg(self, key: dict, value: dict): + if key["keytype"] == "CONFIG": if "subject" in key and key["subject"] is not None: if not value: self.log.info("Deleting compatibility config completely for subject: %r", key["subject"]) @@ -349,7 +349,7 @@ def handle_msg(self, key: dict, value: Optional[dict]) -> None: self.schemas[value["id"]] = typed_schema if value["id"] > self.global_schema_id: # Not an existing schema self.global_schema_id = value["id"] - elif key["keytype"] == "DELETE_SUBJECT" and value: + elif key["keytype"] == "DELETE_SUBJECT": self.log.info("Deleting subject: %r, value: %r", value["subject"], value) if not value["subject"] in self.subjects: self.log.error("Subject: %r did not exist, should have", value["subject"])