Skip to content

Commit

Permalink
implemented feedback from the review
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 24, 2023
1 parent 20b7256 commit c8f35f3
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 25 deletions.
3 changes: 3 additions & 0 deletions karapace/key_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def format_key(
corrected_key["subject"] = key["subject"]
if "version" in key:
corrected_key["version"] = key["version"]
if "topic" in key:
corrected_key["topic"] = key["topic"]

# Magic is the last element
corrected_key["magic"] = key["magic"]
return json_encode(corrected_key, binary=True, sort_keys=False, compact=True)
Expand Down
5 changes: 3 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)

def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None:
assert isinstance(value, dict)
topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"])
topic = TopicName(key["topic"])
skip_validation = bool(value["skip_validation"])
self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation)

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def update_require_validation_for_topic(
skip_validation: bool,
) -> None:
key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0}
value = {"skip_validation": skip_validation, "topic": topic_name}
value = {"skip_validation": skip_validation}
self.producer.send_message(key=key, value=value)

def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None:
Expand Down
84 changes: 62 additions & 22 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None:
status=HTTPStatus.SERVICE_UNAVAILABLE,
)
else:
url = f"{master_url}{request.url.path}"
await self._forward_request_remote(
request=request,
body=request.json,
url=url,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=request.get_header("Content-Type"),
method=request.method,
)
Expand Down Expand Up @@ -320,7 +319,7 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
with_request=False,
json_body=False,
auth=None,
auth=self._auth,
)
self.route(
"/topics/<topic:path>/disable_validation",
Expand All @@ -329,7 +328,7 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
with_request=True,
json_body=False,
auth=None,
auth=self._auth,
)
self.route(
"/topics/<topic:path>/enable_validation",
Expand All @@ -338,7 +337,7 @@ def _add_schema_registry_routes(self) -> None:
schema_request=True,
with_request=True,
json_body=False,
auth=None,
auth=self._auth,
)

async def close(self) -> None:
Expand Down Expand Up @@ -581,7 +580,8 @@ async def schemas_get_versions(

deleted = request.query.get("deleted", "false").lower() == "true"
subject_versions = []
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int, include_deleted=deleted):
for subject_version in self.schema_registry.get_subject_versions_for_schema(schema_id_int,
include_deleted=deleted):
subject = subject_version["subject"]
if self._auth and not self._auth.check_authorization(user, Operation.Read, f"Subject:{subject}"):
continue
Expand Down Expand Up @@ -621,8 +621,13 @@ async def config_set(self, content_type: str, *, request: HTTPRequest, user: Use
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="PUT")
await self._forward_request_remote(
request=request,
body=body,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)

Expand Down Expand Up @@ -658,7 +663,8 @@ async def config_subject_get(
self.r(
body={
"error_code": SchemaErrorCodes.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_ERROR_CODE.value,
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(subject=subject),
"message": SchemaErrorMessages.SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED_FMT.value.format(
subject=subject),
},
content_type=content_type,
status=HTTPStatus.NOT_FOUND,
Expand Down Expand Up @@ -692,9 +698,12 @@ async def config_subject_set(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
request=request,
body=request.json,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": compatibility_level.value}, content_type)
Expand All @@ -717,9 +726,12 @@ async def config_subject_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
await self._forward_request_remote(
request=request, body=request.json, url=url, content_type=content_type, method="PUT"
request=request,
body=request.json,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="PUT",
)

self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type)
Expand Down Expand Up @@ -791,8 +803,14 @@ async def subject_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
await self._forward_request_remote(
request=request,
body={},
url=compute_forwarded_url(master_url=master_url,
request_url=str(request.url) + f"?permanent={permanent}"),
content_type=content_type,
method="DELETE",
)

async def subject_version_get(
self, content_type: str, *, subject: str, version: str, request: HTTPRequest, user: User | None = None
Expand Down Expand Up @@ -894,8 +912,14 @@ async def subject_version_delete(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="DELETE")
await self._forward_request_remote(
request=request,
body={},
url=compute_forwarded_url(master_url=master_url,
request_url=str(request.url) + f"?permanent={permanent}"),
content_type=content_type,
method="DELETE",
)

async def subject_version_schema_get(
self, content_type: str, *, subject: str, version: str, user: User | None = None
Expand Down Expand Up @@ -1119,7 +1143,8 @@ async def subjects_schema_post(
# When checking if schema is already registered, allow unvalidated schema in as
# there might be stored schemas that are non-compliant from the past.
new_schema = ParsedTypedSchema.parse(
schema_type=schema_type, schema_str=schema_str, references=references, dependencies=new_schema_dependencies
schema_type=schema_type, schema_str=schema_str, references=references,
dependencies=new_schema_dependencies
)
except InvalidSchema:
self.log.exception("No proper parser found")
Expand Down Expand Up @@ -1279,10 +1304,22 @@ async def subject_post(
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")
await self._forward_request_remote(
request=request,
body=body,
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="POST",
)

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
async def is_topic_requiring_validation(
self,
content_type: str,
*,
topic: str,
user: User | None = None,
) -> None:
self._check_authorization(user, Operation.Read, "Config:")
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic))
reply = {"require_validation": require_validation}
self.r(reply, content_type)
Expand All @@ -1294,7 +1331,10 @@ async def set_topic_require_validation(
*,
topic: str,
skip_validation: bool,
user: User | None = None,
) -> None:
self._check_authorization(user, Operation.Write, "Config:")

topic_name = TopicName(topic)

already_skipping_validation = skip_validation and not self.schema_registry.database.is_topic_requiring_validation(
Expand All @@ -1319,7 +1359,7 @@ async def set_topic_require_validation(
await self._forward_request_remote(
request=request,
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
url=compute_forwarded_url(master_url=master_url, request_url=str(request.url)),
content_type=content_type,
method="POST",
)
Expand Down

0 comments on commit c8f35f3

Please sign in to comment.