Skip to content

Commit

Permalink
feat: add configurable validation strategy by topic
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Oct 28, 2023
1 parent 101d69e commit a314cac
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 32 deletions.
1 change: 1 addition & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class NameStrategy(Enum):
topic_name = "topic_name"
record_name = "record_name"
topic_record_name = "topic_record_name"
no_validation = "no_validation_strategy"


def parse_env_value(value: str) -> str | int | bool:
Expand Down
13 changes: 12 additions & 1 deletion karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from __future__ import annotations

from dataclasses import dataclass, field
from karapace.config import NameStrategy
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_references import Reference, Referents
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName
from threading import Lock, RLock
from typing import Iterable, Sequence

Expand All @@ -32,6 +33,7 @@ def __init__(self) -> None:
self.schemas: dict[SchemaId, TypedSchema] = {}
self.schema_lock_thread = RLock()
self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {}
self.topic_validation_strategies: dict[TopicName, NameStrategy] = {}

# Content based deduplication of schemas. This is used to reduce memory
# usage when the same schema is produce multiple times to the same or
Expand Down Expand Up @@ -229,6 +231,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di
if schema_version.deleted is False
}

def get_topic_strategy(self, *, topic_name: TopicName) -> NameStrategy:
if topic_name not in self.topic_validation_strategies:
return NameStrategy.topic_name

return self.topic_validation_strategies[topic_name]

def override_topic_strategy(self, *, topic_name: TopicName, name_strategy: NameStrategy) -> None:
self.topic_validation_strategies[topic_name] = name_strategy

def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None:
with self.schema_lock_thread:
for schema_version in self.subjects[subject].schemas.values():
Expand Down
16 changes: 11 additions & 5 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
TopicAuthorizationFailedError,
UnknownTopicOrPartitionError,
)
from karapace.config import Config, create_client_ssl_context
from karapace.config import Config, create_client_ssl_context, NameStrategy
from karapace.errors import InvalidSchema
from karapace.kafka_rest_apis.admin import KafkaRestAdminClient
from karapace.kafka_rest_apis.authentication import (
Expand All @@ -29,7 +29,7 @@
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.serialization import InvalidMessageSchema, InvalidPayload, SchemaRegistrySerializer, SchemaRetrievalError
from karapace.typing import SchemaId, Subject
from karapace.typing import SchemaId, Subject, TopicName
from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient
from typing import Callable, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -773,26 +773,32 @@ async def get_schema_id(
SchemaId(int(data[f"{prefix}_schema_id"])) if f"{prefix}_schema_id" in data else None
)
schema_str = data.get(f"{prefix}_schema")
naming_strategy = await self.serializer.get_topic_strategy_name(topic_name=TopicName(topic))

if schema_id is None and schema_str is None:
raise InvalidSchema()

if schema_id is None:
parsed_schema = ValidatedTypedSchema.parse(schema_type, schema_str)
subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type)

subject_name = self.serializer.get_subject_name(topic, parsed_schema, prefix, schema_type, naming_strategy)
schema_id = await self._query_schema_id_from_cache_or_registry(parsed_schema, schema_str, subject_name)
else:

def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type)
subject = self.serializer.get_subject_name(topic, schema, prefix, schema_type, naming_strategy)
return subject not in subjects

parsed_schema, valid_subjects = await self._query_schema_and_subjects(
schema_id,
need_new_call=subject_not_included,
)

if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
if (
self.config["name_strategy_validation"]
and naming_strategy != NameStrategy.no_validation
and subject_not_included(parsed_schema, valid_subjects)
):
raise InvalidSchema()

return schema_id
Expand Down
47 changes: 37 additions & 10 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from avro.schema import Schema as AvroSchema
from contextlib import closing, ExitStack
from enum import Enum
from jsonschema.validators import Draft7Validator
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient, NewTopic
Expand All @@ -20,7 +21,7 @@
TopicAlreadyExistsError,
)
from karapace import constants
from karapace.config import Config
from karapace.config import Config, NameStrategy
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences, InvalidSchema
from karapace.in_memory_database import InMemoryDatabase
Expand All @@ -31,7 +32,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
from typing import Final, Mapping, Sequence
Expand All @@ -58,6 +59,14 @@
METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions"


class MessageType(Enum):
config = "CONFIG"
schema = "SCHEMA"
delete_subject = "DELETE_SUBJECT"
schema_strategy = "SCHEMA_STRATEGY"
no_operation = "NOOP"


def _create_consumer_from_config(config: Config) -> KafkaConsumer:
# Group not set on purpose, all consumers read the same data
session_timeout_ms = config["session_timeout_ms"]
Expand Down Expand Up @@ -429,6 +438,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: #
LOG.info("Deleting subject: %r, value: %r", subject, value)
self.database.delete_subject(subject=subject, version=version)

def _handle_msg_schema_strategy(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument
assert isinstance(value, dict)
topic, strategy = value["topic"], value["strategy"]
self.database.override_topic_strategy(topic_name=TopicName(topic), name_strategy=NameStrategy(strategy))

def _handle_msg_schema_hard_delete(self, key: dict) -> None:
subject, version = key["subject"], key["version"]

Expand Down Expand Up @@ -522,14 +536,27 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

def handle_msg(self, key: dict, value: dict | None) -> None:
if key["keytype"] == "CONFIG":
self._handle_msg_config(key, value)
elif key["keytype"] == "SCHEMA":
self._handle_msg_schema(key, value)
elif key["keytype"] == "DELETE_SUBJECT":
self._handle_msg_delete_subject(key, value)
elif key["keytype"] == "NOOP": # for spec completeness
pass
if "keytype" in key:
try:
message_type = MessageType(key["keytype"])

if message_type == MessageType.config:
self._handle_msg_config(key, value)
elif message_type == MessageType.schema:
self._handle_msg_schema(key, value)
elif message_type == MessageType.delete_subject:
self._handle_msg_delete_subject(key, value)
elif message_type == MessageType.schema_strategy:
self._handle_msg_schema_strategy(key, value)
elif message_type == MessageType.no_operation:
pass
except ValueError:
LOG.error("The message %s-%s has been discarded because the %s is not managed", key, value, key["keytype"])

else:
LOG.error(
"The message %s-%s has been discarded because doesn't contain the `keytype` key in the key", key, value
)

def remove_referenced_by(
self,
Expand Down
19 changes: 16 additions & 3 deletions karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import AsyncExitStack, closing
from karapace.compatibility import check_compatibility, CompatibilityModes
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.config import Config
from karapace.config import Config, NameStrategy
from karapace.dependency import Dependency
from karapace.errors import (
IncompatibleSchema,
Expand All @@ -27,9 +27,9 @@
from karapace.messaging import KarapaceProducer
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_reader import KafkaSchemaReader, MessageType
from karapace.schema_references import LatestVersionReference, Reference
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version
from typing import Mapping, Sequence

import asyncio
Expand Down Expand Up @@ -466,6 +466,19 @@ def send_schema_message(
value = None
self.producer.send_message(key=key, value=value)

def get_validation_strategy_for_topic(self, *, topic_name: TopicName) -> NameStrategy:
return self.database.get_topic_strategy(topic_name=topic_name)

def send_validation_strategy_for_topic(
self,
*,
topic_name: TopicName,
validation_strategy: NameStrategy,
) -> None:
key = {"topic": topic_name, "keytype": MessageType.schema_strategy.value, "magic": 0}
value = {"strategy": validation_strategy.value, "topic": topic_name}
self.producer.send_message(key=key, value=value)

def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None:
key = {"subject": subject, "magic": 0, "keytype": "CONFIG"}
value = {"compatibilityLevel": compatibility_level.value}
Expand Down
93 changes: 90 additions & 3 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import check_compatibility, CompatibilityModes
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.config import Config
from karapace.config import Config, NameStrategy
from karapace.errors import (
IncompatibleSchema,
InvalidReferences,
Expand All @@ -28,13 +28,13 @@
SubjectSoftDeletedException,
VersionNotFoundException,
)
from karapace.karapace import KarapaceBase
from karapace.karapace import empty_response, KarapaceBase
from karapace.protobuf.exception import ProtobufUnresolvedDependencyException
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME
from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.schema_registry import KarapaceSchemaRegistry, validate_version
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName
from karapace.utils import JSONDecodeError
from typing import Any

Expand Down Expand Up @@ -301,6 +301,23 @@ def _add_schema_registry_routes(self) -> None:
json_body=False,
auth=self._auth,
)
self.route(
"/topic/<topic:path>/name_strategy",
callback=self.subject_validation_strategy_get,
method="GET",
schema_request=True,
json_body=False,
auth=None,
)
self.route(
"/topic/<topic:path>/name_strategy/<strategy:path>",
callback=self.subject_validation_strategy_set,
method="POST",
schema_request=True,
with_request=True,
json_body=False,
auth=None,
)

async def close(self) -> None:
async with AsyncExitStack() as stack:
Expand Down Expand Up @@ -985,6 +1002,38 @@ def _validate_schema_type(self, content_type: str, data: JsonData) -> SchemaType
)
return schema_type

def _validate_topic_name(self, topic: str) -> TopicName:
valid_topic_names = self.schema_registry.schema_reader.admin_client.list_topics()

if topic in valid_topic_names:
return TopicName(topic)

self.r(
body={
"error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
"message": f"The topic {topic} isn't existing, proceed with creating it first",
},
content_type=JSON_CONTENT_TYPE,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

def _validate_name_strategy(self, name_strategy: str) -> NameStrategy:
try:
strategy = NameStrategy(name_strategy)
return strategy
except ValueError:
valid_strategies = [strategy.value for strategy in NameStrategy]
error_message = f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}"

self.r(
body={
"error_code": SchemaErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
"message": error_message,
},
content_type=JSON_CONTENT_TYPE,
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

def _validate_schema_key(self, content_type: str, body: dict) -> None:
if "schema" not in body:
self.r(
Expand Down Expand Up @@ -1238,6 +1287,44 @@ async def subject_post(
url = f"{master_url}/subjects/{subject}/versions"
await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST")

async def subject_validation_strategy_get(self, content_type: str, *, topic: str) -> None:
strategy_name = self.schema_registry.get_validation_strategy_for_topic(topic_name=TopicName(topic)).value
reply = {"strategy": strategy_name}
self.r(reply, content_type)

async def subject_validation_strategy_set(
self,
content_type: str,
request: HTTPRequest,
*,
topic: str,
strategy: str,
) -> None:
# proceeding with the strategy first since it's cheaper
strategy_name = self._validate_name_strategy(strategy)
# real validation of the topic name commented, do we need to do that? does it make sense?
topic_name = TopicName(topic) # self._validate_topic_name(topic)

are_we_master, master_url = await self.schema_registry.get_master()
if are_we_master:
self.schema_registry.send_validation_strategy_for_topic(
topic_name=topic_name,
validation_strategy=strategy_name,
)
empty_response()
else:
# I don't really like it, in theory we should parse the URL and change only the host portion while
# keeping the rest the same
url = f"{master_url}/topic/{topic}/name_strategy"

await self._forward_request_remote(
request=request,
body=None,
url=url,
content_type=content_type,
method="POST",
)

def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None:
schema_id = self.schema_registry.database.get_schema_id_if_exists(
subject=subject, schema=schema, include_deleted=include_deleted
Expand Down
Loading

0 comments on commit a314cac

Please sign in to comment.