diff --git a/karapace/key_format.py b/karapace/key_format.py index e39d4c49e..44d961148 100644 --- a/karapace/key_format.py +++ b/karapace/key_format.py @@ -70,6 +70,9 @@ def format_key( corrected_key["subject"] = key["subject"] if "version" in key: corrected_key["version"] = key["version"] + if "topic" in key: + corrected_key["topic"] = key["topic"] + # Magic is the last element corrected_key["magic"] = key["magic"] return json_encode(corrected_key, binary=True, sort_keys=False, compact=True) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 44afb2b7b..997fdea29 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -429,9 +429,10 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # LOG.info("Deleting subject: %r, value: %r", subject, value) self.database.delete_subject(subject=subject, version=version) - def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument + def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: assert isinstance(value, dict) - topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"]) + topic = TopicName(key["topic"]) + skip_validation = bool(value["skip_validation"]) self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation) def _handle_msg_schema_hard_delete(self, key: dict) -> None: diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 311baaa2a..4847fd063 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -476,7 +476,7 @@ def update_require_validation_for_topic( skip_validation: bool, ) -> None: key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0} - value = {"skip_validation": skip_validation, "topic": topic_name} + value = {"skip_validation": skip_validation} self.producer.send_message(key=key, value=value) def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None: diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 373cf00a9..9eb49ca96 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -156,11 +156,10 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: status=HTTPStatus.SERVICE_UNAVAILABLE, ) else: - url = f"{master_url}{request.url.path}" await self._forward_request_remote( request=request, body=request.json, - url=url, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), content_type=request.get_header("Content-Type"), method=request.method, ) @@ -320,7 +319,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=False, json_body=False, - auth=None, + auth=self._auth, ) self.route( "/topics//disable_validation", @@ -329,7 +328,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, - auth=None, + auth=self._auth, ) self.route( "/topics//enable_validation", @@ -338,7 +337,7 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, with_request=True, json_body=False, - auth=None, + auth=self._auth, ) async def close(self) -> None: @@ -581,7 +580,8 @@ async def schemas_get_versions( deleted = request.query.get("deleted", "false").lower() == "true" subject_versions = [] - for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, include_deleted=deleted): + for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, + include_deleted=deleted): subject = subject_version["subject"] if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"): continue @@ -621,8 +621,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT") + await self._forward_request_remote( + request=request, + body=body, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", + ) self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) @@ -658,7 +663,8 @@ async def config_subject_get( self.r( body={ "error_code": SchemaErrorCodes.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_ERROR_CODE.value, - "message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(subject=subject), + "message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format( + subject=subject), }, content_type=content_type, status=HTTPStatus.NOT_FOUND, @@ -692,9 +698,12 @@ async def config_subject_set( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config/{subject}" await self._forward_request_remote( - request=request, body=request.json, url=url, content_type=content_type, method="PUT" + request=request, + body=request.json, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", ) self.r({"compatibility": compatibility_level.value}, content_type) @@ -717,9 +726,12 @@ async def config_subject_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/config/{subject}" await self._forward_request_remote( - request=request, body=request.json, url=url, content_type=content_type, method="PUT" + request=request, + body=request.json, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="PUT", ) self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) @@ -791,8 +803,14 @@ async def subject_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote( + request=request, + body={}, + url=compute_forwarded_url(master_url=master_url, + request_url=str(request.url) + f"?permanent={permanent}"), + content_type=content_type, + method="DELETE", + ) async def subject_version_get( self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None @@ -894,8 +912,14 @@ async def subject_version_delete( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" - await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE") + await self._forward_request_remote( + request=request, + body={}, + url=compute_forwarded_url(master_url=master_url, + request_url=str(request.url) + f"?permanent={permanent}"), + content_type=content_type, + method="DELETE", + ) async def subject_version_schema_get( self, content_type: str, *, subject: str, version: str, user: User | None = None @@ -1119,7 +1143,8 @@ async def subjects_schema_post( # When checking if schema is already registered, allow unvalidated schema in as # there might be stored schemas that are non-compliant from the past. new_schema = ParsedTypedSchema.parse( - schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies + schema_type=schema_type, schema_str=schema_str, references=references, + dependencies=new_schema_dependencies ) except InvalidSchema: self.log.exception("No proper parser found") @@ -1279,10 +1304,22 @@ async def subject_post( elif not master_url: self.no_master_error(content_type) else: - url = f"{master_url}/subjects/{subject}/versions" - await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") + await self._forward_request_remote( + request=request, + body=body, + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), + content_type=content_type, + method="POST", + ) - async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None: + async def is_topic_requiring_validation( + self, + content_type: str, + *, + topic: str, + user: User | None = None, + ) -> None: + self._check_authorization(user, Operation.Read, "Config:") require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)) reply = {"require_validation": require_validation} self.r(reply, content_type) @@ -1294,7 +1331,10 @@ async def set_topic_require_validation( *, topic: str, skip_validation: bool, + user: User | None = None, ) -> None: + self._check_authorization(user, Operation.Write, "Config:") + topic_name = TopicName(topic) already_skipping_validation = skip_validation and not self.schema_registry.database.is_topic_requiring_validation( @@ -1319,7 +1359,7 @@ async def set_topic_require_validation( await self._forward_request_remote( request=request, body=None, - url=compute_forwarded_url(master_url=master_url, request_url=request.url), + url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)), content_type=content_type, method="POST", )