Skip to content

Commit

Permalink
feat: enable users to disable validation only on specific topics
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 9, 2023
1 parent 0a57f3b commit 65bf208
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 33 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- Name strategy to use when storing schemas from the kafka rest proxy service. You can opt between ``name_strategy`` , ``record_name`` and ``topic_record_name``
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
- If enabled, validate that given schema is registered under the expected subjects requireds by the specified name strategy when producing messages from Kafka Rest. Otherwise no validation are performed
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
12 changes: 11 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass, field
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -32,6 +32,7 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.topic_without_validation: set[TopicName] = set()

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -229,6 +230,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
if schema_version.deleted is False
}

def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool:
return topic_name not in self.topic_without_validation

def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None:
if skip_validation:
self.topic_without_validation.add(topic_name)
else:
self.topic_without_validation.discard(topic_name)

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
Expand Down
43 changes: 40 additions & 3 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,21 @@
get_subject_name,
InvalidMessageSchema,
InvalidPayload,
SchemaRegistryClient,
SchemaRegistrySerializer,
SchemaRetrievalError,
)
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType, TopicName
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Callable, Dict, Final, List, MutableMapping, NewType, Optional, Tuple, Union

import asyncio
import base64
import datetime
import logging
import time

SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value]
SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value_]
PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"}
RECORD_CODES = [42201, 42202]
KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"}
Expand Down Expand Up @@ -419,6 +420,35 @@ async def topic_publish(self, topic: str, content_type: str, *, request: HTTPReq
await proxy.topic_publish(topic, content_type, request=request)


LastTimeCheck = NewType("LastTimeCheck", float)

DEFAULT_CACHE_INTERVAL_NS: Final = 120 * 1_000_000_000 # 120 seconds


class ValidationCheckWrapper:
def __init__(self, registry_client: SchemaRegistryClient):
self._last_check = 0
# by default if not specified otherwise, let's be conservative
self._require_validation = True
self.registry_client = registry_client

async def _query_registry(self) -> bool:
return True

async def require_validation(self) -> bool:
if (time.monotonic_ns() - self._last_check) > DEFAULT_CACHE_INTERVAL_NS:
self._require_validation = await self._query_registry()
self._last_check = time.monotonic_ns()

return self._require_validation

@classmethod
async def construct_new(cls, registry_client: SchemaRegistryClient):
validation_checker = ValidationCheckWrapper(registry_client=registry_client)
validation_checker._require_validation = await validation_checker._query_registry()
validation_checker._last_check = time.monotonic_ns()


class UserRestProxy:
def __init__(
self,
Expand Down Expand Up @@ -446,6 +476,13 @@ def __init__(
self._async_producer_lock = asyncio.Lock()
self._async_producer: Optional[AIOKafkaProducer] = None
self.naming_strategy = NameStrategy(self.config["name_strategy"])
self.topic_validation: MutableMapping[TopicName,] = {}

async def is_validation_required(self, topic_name: TopicName) -> bool:
if topic_name not in self.topic_validation:
self.topic_validation[topic_name] = await ValidationCheckWrapper.construct_new(self.serializer.registry_client)

return

def __str__(self) -> str:
return f"UserRestProxy(username={self.config['sasl_plain_username']})"
Expand Down
13 changes: 10 additions & 3 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand All @@ -32,7 +31,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, StrEnum, Subject, TopicName
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand All @@ -59,10 +58,11 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
class MessageType(StrEnum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
schema_validation = "SCHEMA_VALIDATION"
no_operation = "NOOP"


Expand Down Expand Up @@ -437,6 +437,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)

def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"])
self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation)

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

Expand Down Expand Up @@ -540,6 +545,8 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
self._handle_msg_schema(key, value)
elif message_type == MessageType.delete_subject:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.schema_validation:
self._handle_msg_schema_validation(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
Expand Down
17 changes: 15 additions & 2 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
from karapace.messaging import KarapaceProducer
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_reader import KafkaSchemaReader, MessageType
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version
from typing import Mapping, Sequence

import asyncio
Expand Down Expand Up @@ -466,6 +466,19 @@ def send_schema_message(
value = None
self.producer.send_message(key=key, value=value)

def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool:
return self.database.is_topic_requiring_validation(topic_name=topic_name)

def update_require_validation_for_topic(
self,
*,
topic_name: TopicName,
require_validation: bool,
) -> None:
key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0}
value = {"skip_validation": require_validation, "topic": topic_name}
self.producer.send_message(key=key, value=value)

def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None:
key = {"subject": subject, "magic": 0, "keytype": "CONFIG"}
value = {"compatibilityLevel": compatibility_level.value}
Expand Down
84 changes: 82 additions & 2 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from avro.errors import SchemaParseException
from contextlib import AsyncExitStack
from enum import Enum, unique
from functools import partial
from http import HTTPStatus
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import check_compatibility, CompatibilityModes
Expand All @@ -28,20 +29,30 @@
SubjectSoftDeletedException,
VersionNotFoundException,
)
from karapace.karapace import KarapaceBase
from karapace.karapace import empty_response, KarapaceBase
from karapace.protobuf.exception import ProtobufUnresolvedDependencyException
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
from karapace.utils import JSONDecodeError
from typing import Any
from urllib.parse import urlparse

import aiohttp
import async_timeout


def compute_forwarded_url(master_url: str, request_url: str) -> str:
parsed_master_url = urlparse(master_url)
parser_request_url = urlparse(request_url)
return parser_request_url._replace(
scheme=parsed_master_url.scheme,
netloc=parsed_master_url.netloc,
).geturl()


@unique
class SchemaErrorCodes(Enum):
HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value
Expand Down Expand Up @@ -301,6 +312,33 @@ def _add_schema_registry_routes(self) -> None:
json_body=False,
auth=self._auth,
)
self.route(
"/topic/<topic:path>/require_validation",
callback=self.is_topic_requiring_validation,
method="GET",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)
self.route(
"/topic/<topic:path>/disable_validation",
callback=partial(self.set_topic_require_validation, require_validation=False),
method="POST",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)
self.route(
"/topic/<topic:path>/enable_validation",
callback=partial(self.set_topic_require_validation, require_validation=True),
method="POST",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)

async def close(self) -> None:
async with AsyncExitStack() as stack:
Expand Down Expand Up @@ -1238,6 +1276,48 @@ async def subject_post(
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None:
require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic))
reply = {"require_validation": require_validation}
self.r(reply, content_type)

async def set_topic_require_validation(
self,
content_type: str,
request: HTTPRequest,
*,
topic: str,
require_validation: bool,
) -> None:
topic_name = TopicName(topic)

already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name
)
already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation(
topic_name=topic_name
)

if already_validating or already_skipping_validation:
empty_response()

are_we_master, master_url = await self.schema_registry.get_master()

if are_we_master:
self.schema_registry.update_require_validation_for_topic(
topic_name=topic_name,
require_validation=require_validation,
)
empty_response()
else:
await self._forward_request_remote(
request=request,
body=None,
url=compute_forwarded_url(master_url=master_url, request_url=request.url),
content_type=content_type,
method="POST",
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
schema_id = self.schema_registry.database.get_schema_id_if_exists(
subject=subject, schema=schema, include_deleted=include_deleted
Expand Down
3 changes: 2 additions & 1 deletion karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
from google.protobuf.message import DecodeError
from jsonschema import ValidationError
from karapace.client import Client
from karapace.config import NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.typing import ResolvedVersion, SchemaId, Subject, SubjectType
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, MutableMapping
from urllib.parse import quote
Expand Down
3 changes: 2 additions & 1 deletion karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class NameStrategy(StrEnum, Enum):
@unique
class SubjectType(StrEnum, Enum):
key = "key"
value = "value"
# value it's a property of the Enum class, avoiding the collision.
value_ = "value"
# partition it's a function of `str` and StrEnum its inherits from it.
partition_ = "partition"
Loading

0 comments on commit 65bf208

Please sign in to comment.