Skip to content

Commit

Permalink
feat: add configurable validation strategy by topic
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 9, 2023
1 parent 0a57f3b commit 0932649
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- Name strategy to use when storing schemas from the kafka rest proxy service. You can opt between ``name_strategy`` , ``record_name`` and ``topic_record_name``
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
- If enabled, validate that given schema is registered under the expected subjects requireds by the specified name strategy when producing messages from Kafka Rest. Otherwise no validation are performed
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
17 changes: 15 additions & 2 deletions karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
"""
from __future__ import annotations

from cachetools import TTLCache
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence
from typing import Iterable, MutableMapping, Sequence

import logging

Expand All @@ -32,6 +33,9 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
# this should be a set, but afaik there isn't a TTL set. I'm using this bad modeling
# instead of re-implement the feature in another custom data structure.
self.topic_without_validation: MutableMapping[TopicName, None] = TTLCache(maxsize=10000, ttl=600)

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -229,6 +233,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
if schema_version.deleted is False
}

def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool:
return topic_name not in self.topic_without_validation

def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None:
if skip_validation:
self.topic_without_validation[topic_name] = None
else:
self.topic_without_validation.pop(topic_name)

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
Expand Down
13 changes: 10 additions & 3 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand All @@ -32,7 +31,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, StrEnum, Subject, TopicName
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand All @@ -59,10 +58,11 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
class MessageType(StrEnum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
schema_validation = "SCHEMA_VALIDATION"
no_operation = "NOOP"


Expand Down Expand Up @@ -437,6 +437,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)

def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"])
self.database.override_topic_validation(topic_name=topic, require_validation=skip_validation)

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

Expand Down Expand Up @@ -540,6 +545,8 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
self._handle_msg_schema(key, value)
elif message_type == MessageType.delete_subject:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.schema_validation:
self._handle_msg_schema_validation(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
Expand Down
20 changes: 17 additions & 3 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import AsyncExitStack, closing
from karapace.compatibility import check_compatibility, CompatibilityModes
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.config import Config
from karapace.config import Config, NameStrategy
from karapace.dependency import Dependency
from karapace.errors import (
IncompatibleSchema,
Expand All @@ -27,9 +27,9 @@
from karapace.messaging import KarapaceProducer
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_reader import KafkaSchemaReader, MessageType
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version
from typing import Mapping, Sequence

import asyncio
Expand Down Expand Up @@ -466,6 +466,20 @@ def send_schema_message(
value = None
self.producer.send_message(key=key, value=value)

def is_topic_requiring_validation(self, *, topic_name: TopicName) -> NameStrategy:
strategy = self.database.is_topic_requiring_validation(topic_name=topic_name)
return strategy if strategy is not None else NameStrategy(self.config["name_strategy"])

def update_require_validation_for_topic(
self,
*,
topic_name: TopicName,
require_validation: bool,
) -> None:
key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0}
value = {"skip_validation": require_validation, "topic": topic_name}
self.producer.send_message(key=key, value=value)

def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None:
key = {"subject": subject, "magic": 0, "keytype": "CONFIG"}
value = {"compatibilityLevel": compatibility_level.value}
Expand Down
84 changes: 82 additions & 2 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from avro.errors import SchemaParseException
from contextlib import AsyncExitStack
from enum import Enum, unique
from functools import partial
from http import HTTPStatus
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import check_compatibility, CompatibilityModes
Expand All @@ -28,20 +29,30 @@
SubjectSoftDeletedException,
VersionNotFoundException,
)
from karapace.karapace import KarapaceBase
from karapace.karapace import empty_response, KarapaceBase
from karapace.protobuf.exception import ProtobufUnresolvedDependencyException
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
from karapace.utils import JSONDecodeError
from typing import Any
from urllib.parse import urlparse

import aiohttp
import async_timeout


def compute_forwarded_url(master_url: str, request_url: str) -> str:
parsed_master_url = urlparse(master_url)
parser_request_url = urlparse(request_url)
return parser_request_url._replace(
scheme=parsed_master_url.scheme,
netloc=parsed_master_url.netloc,
).geturl()


@unique
class SchemaErrorCodes(Enum):
HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value
Expand Down Expand Up @@ -301,6 +312,33 @@ def _add_schema_registry_routes(self) -> None:
json_body=False,
auth=self._auth,
)
self.route(
"/topic/<topic:path>/require_validation",
callback=self.is_topic_requiring_validation,
method="GET",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)
self.route(
"/topic/<topic:path>/disable_validation",
callback=partial(self.set_topic_require_validation, require_validation=False),
method="POST",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)
self.route(
"/topic/<topic:path>/enable_validation",
callback=partial(self.set_topic_require_validation, require_validation=True),
method="POST",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)

async def close(self) -> None:
async with AsyncExitStack() as stack:
Expand Down Expand Up @@ -1238,6 +1276,48 @@ async def subject_post(
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)).value
reply = {"require_validation": require_validation}
self.r(reply, content_type)

async def set_topic_require_validation(
self,
content_type: str,
request: HTTPRequest,
*,
topic: str,
require_validation: bool,
) -> None:
topic_name = TopicName(topic)

already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name
)
already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name
)

if already_validating or already_skipping_validation:
empty_response()

are_we_master, master_url = await self.schema_registry.get_master()

if are_we_master:
self.schema_registry.update_require_validation_for_topic(
topic_name=topic_name,
require_validation=require_validation,
)
empty_response()
else:
await self._forward_request_remote(
request=request,
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
content_type=content_type,
method="POST",
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
schema_id = self.schema_registry.database.get_schema_id_if_exists(
subject=subject, schema=schema, include_deleted=include_deleted
Expand Down
7 changes: 6 additions & 1 deletion karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from google.protobuf.message import DecodeError
from jsonschema import ValidationError
from karapace.client import Client
from karapace.config import NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.typing import ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, MutableMapping
from urllib.parse import quote
Expand Down Expand Up @@ -96,6 +97,10 @@ def topic_record_name_strategy(
return Subject(f"{topic_name}-{validated_record_name}")


def no_validation_strategy(topic_name: str, record_name: str) -> str:
return f"__auto_registration_anonymous_{topic_record_name_strategy(topic_name, record_name)}"


NAME_STRATEGIES = {
NameStrategy.topic_name: topic_name_strategy,
NameStrategy.record_name: record_name_strategy,
Expand Down
Loading

0 comments on commit 0932649

Please sign in to comment.