From 65bf208aed7a583f7f8c0aff1adec9d5922ed8c9 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Tue, 24 Oct 2023 10:09:51 +0200 Subject: [PATCH] feat: enable users to disable validation only on specific topics --- README.rst | 2 +- karapace/in_memory_database.py | 12 +++- karapace/kafka_rest_apis/__init__.py | 43 ++++++++++++- karapace/schema_reader.py | 13 +++- karapace/schema_registry.py | 17 +++++- karapace/schema_registry_apis.py | 84 +++++++++++++++++++++++++- karapace/serialization.py | 3 +- karapace/typing.py | 3 +- tests/integration/conftest.py | 25 ++++++-- tests/integration/test_rest.py | 60 ++++++++++++++++++ tests/integration/utils/cluster.py | 14 ++++- tests/unit/test_schema_registry_api.py | 12 +++- tests/unit/test_serialization.py | 18 +++--- 13 files changed, 273 insertions(+), 33 deletions(-) diff --git a/README.rst b/README.rst index 288685281..d8c7bcd3f 100644 --- a/README.rst +++ b/README.rst @@ -464,7 +464,7 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ - Name strategy to use when storing schemas from the kafka rest proxy service. You can opt between ``name_strategy`` , ``record_name`` and ``topic_record_name`` * - ``name_strategy_validation`` - ``true`` - - If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest + - If enabled, validate that given schema is registered under the expected subjects requireds by the specified name strategy when producing messages from Kafka Rest. Otherwise no validation are performed * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) diff --git a/karapace/in_memory_database.py b/karapace/in_memory_database.py index 222e38046..ea863faa0 100644 --- a/karapace/in_memory_database.py +++ b/karapace/in_memory_database.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, field from karapace.schema_models import SchemaVersion, TypedSchema from karapace.schema_references import Reference, Referents -from karapace.typing import ResolvedVersion, SchemaId, Subject +from karapace.typing import ResolvedVersion, SchemaId, Subject, TopicName from threading import Lock, RLock from typing import Iterable, Sequence @@ -32,6 +32,7 @@ def __init__(self) -> None: self.schemas: dict[SchemaId, TypedSchema] = {} self.schema_lock_thread = RLock() self.referenced_by: dict[tuple[Subject, ResolvedVersion], Referents] = {} + self.topic_without_validation: set[TopicName] = set() # Content based deduplication of schemas. This is used to reduce memory # usage when the same schema is produce multiple times to the same or @@ -229,6 +230,15 @@ def find_subject_schemas(self, *, subject: Subject, include_deleted: bool) -> di if schema_version.deleted is False } + def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool: + return topic_name not in self.topic_without_validation + + def override_topic_validation(self, *, topic_name: TopicName, skip_validation: bool) -> None: + if skip_validation: + self.topic_without_validation.add(topic_name) + else: + self.topic_without_validation.discard(topic_name) + def delete_subject(self, *, subject: Subject, version: ResolvedVersion) -> None: with self.schema_lock_thread: for schema_version in self.subjects[subject].schemas.values(): diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index b47dabad2..930b0cdb2 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -32,12 +32,13 @@ get_subject_name, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, SchemaRetrievalError, ) -from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType +from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType, TopicName from karapace.utils import convert_to_int, json_encode, KarapaceKafkaClient -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, Final, List, MutableMapping, NewType, Optional, Tuple, Union import asyncio import base64 @@ -45,7 +46,7 @@ import logging import time -SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value] +SUBJECT_VALID_POSTFIX = [SubjectType.key, SubjectType.value_] PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"} RECORD_CODES = [42201, 42202] KNOWN_FORMATS = {"json", "avro", "protobuf", "binary"} @@ -419,6 +420,35 @@ async def topic_publish(self, topic: str, content_type: str, *, request: HTTPReq await proxy.topic_publish(topic, content_type, request=request) +LastTimeCheck = NewType("LastTimeCheck", float) + +DEFAULT_CACHE_INTERVAL_NS: Final = 120 * 1_000_000_000 # 120 seconds + + +class ValidationCheckWrapper: + def __init__(self, registry_client: SchemaRegistryClient): + self._last_check = 0 + # by default if not specified otherwise, let's be conservative + self._require_validation = True + self.registry_client = registry_client + + async def _query_registry(self) -> bool: + return True + + async def require_validation(self) -> bool: + if (time.monotonic_ns() - self._last_check) > DEFAULT_CACHE_INTERVAL_NS: + self._require_validation = await self._query_registry() + self._last_check = time.monotonic_ns() + + return self._require_validation + + @classmethod + async def construct_new(cls, registry_client: SchemaRegistryClient): + validation_checker = ValidationCheckWrapper(registry_client=registry_client) + validation_checker._require_validation = await validation_checker._query_registry() + validation_checker._last_check = time.monotonic_ns() + + class UserRestProxy: def __init__( self, @@ -446,6 +476,13 @@ def __init__( self._async_producer_lock = asyncio.Lock() self._async_producer: Optional[AIOKafkaProducer] = None self.naming_strategy = NameStrategy(self.config["name_strategy"]) + self.topic_validation: MutableMapping[TopicName,] = {} + + async def is_validation_required(self, topic_name: TopicName) -> bool: + if topic_name not in self.topic_validation: + self.topic_validation[topic_name] = await ValidationCheckWrapper.construct_new(self.serializer.registry_client) + + return def __str__(self) -> str: return f"UserRestProxy(username={self.config['sasl_plain_username']})" diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 36a9bd0e0..4eab58dcf 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -8,7 +8,6 @@ from avro.schema import Schema as AvroSchema from contextlib import closing, ExitStack -from enum import Enum from jsonschema.validators import Draft7Validator from kafka import KafkaConsumer, TopicPartition from kafka.admin import KafkaAdminClient, NewTopic @@ -32,7 +31,7 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, StrEnum, Subject, TopicName from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -59,10 +58,11 @@ METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions" -class MessageType(Enum): +class MessageType(StrEnum): config = "CONFIG" schema = "SCHEMA" delete_subject = "DELETE_SUBJECT" + schema_validation = "SCHEMA_VALIDATION" no_operation = "NOOP" @@ -437,6 +437,11 @@ def _handle_msg_delete_subject(self, key: dict, value: dict | None) -> None: # LOG.info("Deleting subject: %r, value: %r", subject, value) self.database.delete_subject(subject=subject, version=version) + def _handle_msg_schema_validation(self, key: dict, value: dict | None) -> None: # pylint: disable=unused-argument + assert isinstance(value, dict) + topic, skip_validation = TopicName(value["topic"]), bool(value["skip_validation"]) + self.database.override_topic_validation(topic_name=topic, skip_validation=skip_validation) + def _handle_msg_schema_hard_delete(self, key: dict) -> None: subject, version = key["subject"], key["version"] @@ -540,6 +545,8 @@ def handle_msg(self, key: dict, value: dict | None) -> None: self._handle_msg_schema(key, value) elif message_type == MessageType.delete_subject: self._handle_msg_delete_subject(key, value) + elif message_type == MessageType.schema_validation: + self._handle_msg_schema_validation(key, value) elif message_type == MessageType.no_operation: pass except ValueError: diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 867eeb633..92d18219e 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -27,9 +27,9 @@ from karapace.messaging import KarapaceProducer from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema -from karapace.schema_reader import KafkaSchemaReader +from karapace.schema_reader import KafkaSchemaReader, MessageType from karapace.schema_references import LatestVersionReference, Reference -from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, Version +from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject, TopicName, Version from typing import Mapping, Sequence import asyncio @@ -466,6 +466,19 @@ def send_schema_message( value = None self.producer.send_message(key=key, value=value) + def is_topic_requiring_validation(self, *, topic_name: TopicName) -> bool: + return self.database.is_topic_requiring_validation(topic_name=topic_name) + + def update_require_validation_for_topic( + self, + *, + topic_name: TopicName, + require_validation: bool, + ) -> None: + key = {"topic": topic_name, "keytype": str(MessageType.schema_validation), "magic": 0} + value = {"skip_validation": require_validation, "topic": topic_name} + self.producer.send_message(key=key, value=value) + def send_config_message(self, compatibility_level: CompatibilityModes, subject: Subject | None = None) -> None: key = {"subject": subject, "magic": 0, "keytype": "CONFIG"} value = {"compatibilityLevel": compatibility_level.value} diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index f4d22cd78..bbe415fec 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -7,6 +7,7 @@ from avro.errors import SchemaParseException from contextlib import AsyncExitStack from enum import Enum, unique +from functools import partial from http import HTTPStatus from karapace.auth import HTTPAuthorizer, Operation, User from karapace.compatibility import check_compatibility, CompatibilityModes @@ -28,20 +29,30 @@ SubjectSoftDeletedException, VersionNotFoundException, ) -from karapace.karapace import KarapaceBase +from karapace.karapace import empty_response, KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry, validate_version -from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId +from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, TopicName from karapace.utils import JSONDecodeError from typing import Any +from urllib.parse import urlparse import aiohttp import async_timeout +def compute_forwarded_url(master_url: str, request_url: str) -> str: + parsed_master_url = urlparse(master_url) + parser_request_url = urlparse(request_url) + return parser_request_url._replace( + scheme=parsed_master_url.scheme, + netloc=parsed_master_url.netloc, + ).geturl() + + @unique class SchemaErrorCodes(Enum): HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value @@ -301,6 +312,33 @@ def _add_schema_registry_routes(self) -> None: json_body=False, auth=self._auth, ) + self.route( + "/topic//require_validation", + callback=self.is_topic_requiring_validation, + method="GET", + schema_request=True, + with_request=True, + json_body=False, + auth=None, + ) + self.route( + "/topic//disable_validation", + callback=partial(self.set_topic_require_validation, require_validation=False), + method="POST", + schema_request=True, + with_request=True, + json_body=False, + auth=None, + ) + self.route( + "/topic//enable_validation", + callback=partial(self.set_topic_require_validation, require_validation=True), + method="POST", + schema_request=True, + with_request=True, + json_body=False, + auth=None, + ) async def close(self) -> None: async with AsyncExitStack() as stack: @@ -1238,6 +1276,48 @@ async def subject_post( url = f"{master_url}/subjects/{subject}/versions" await self._forward_request_remote(request=request, body=body, url=url, content_type=content_type, method="POST") + async def is_topic_requiring_validation(self, content_type: str, *, topic: str) -> None: + require_validation = self.schema_registry.is_topic_requiring_validation(topic_name=TopicName(topic)) + reply = {"require_validation": require_validation} + self.r(reply, content_type) + + async def set_topic_require_validation( + self, + content_type: str, + request: HTTPRequest, + *, + topic: str, + require_validation: bool, + ) -> None: + topic_name = TopicName(topic) + + already_skipping_validation = not require_validation and self.schema_registry.database.is_topic_requiring_validation( + topic_name=topic_name + ) + already_validating = require_validation and not self.schema_registry.database.is_topic_requiring_validation( + topic_name=topic_name + ) + + if already_validating or already_skipping_validation: + empty_response() + + are_we_master, master_url = await self.schema_registry.get_master() + + if are_we_master: + self.schema_registry.update_require_validation_for_topic( + topic_name=topic_name, + require_validation=require_validation, + ) + empty_response() + else: + await self._forward_request_remote( + request=request, + body=None, + url=compute_forwarded_url(master_url=master_url, request_url=request.url), + content_type=content_type, + method="POST", + ) + def get_schema_id_if_exists(self, *, subject: str, schema: TypedSchema, include_deleted: bool) -> SchemaId | None: schema_id = self.schema_registry.database.get_schema_id_if_exists( subject=subject, schema=schema, include_deleted=include_deleted diff --git a/karapace/serialization.py b/karapace/serialization.py index c199bad7a..66a7090b1 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -11,6 +11,7 @@ from google.protobuf.message import DecodeError from jsonschema import ValidationError from karapace.client import Client +from karapace.config import NameStrategy from karapace.dependency import Dependency from karapace.errors import InvalidReferences from karapace.protobuf.exception import ProtobufTypeException @@ -18,7 +19,7 @@ from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping -from karapace.typing import NameStrategy, ResolvedVersion, SchemaId, Subject, SubjectType +from karapace.typing import ResolvedVersion, SchemaId, Subject, SubjectType from karapace.utils import json_decode, json_encode from typing import Any, Callable, MutableMapping from urllib.parse import quote diff --git a/karapace/typing.py b/karapace/typing.py index 48c6bd815..8ec0b0bec 100644 --- a/karapace/typing.py +++ b/karapace/typing.py @@ -46,6 +46,7 @@ class NameStrategy(StrEnum, Enum): @unique class SubjectType(StrEnum, Enum): key = "key" - value = "value" + # value it's a property of the Enum class, avoiding the collision. + value_ = "value" # partition it's a function of `str` and StrEnum its inherits from it. partition_ = "partition" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1c16b3c2b..27be8887e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,7 +7,7 @@ from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient -from contextlib import closing, ExitStack +from contextlib import asynccontextmanager, closing, ExitStack from dataclasses import asdict from filelock import FileLock from kafka import KafkaProducer @@ -255,7 +255,6 @@ async def fixture_rest_async( @pytest.fixture(scope="function", name="rest_async_client") async def fixture_rest_async_client( request: SubRequest, - loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument rest_async: KafkaRest, aiohttp_client: AiohttpClient, ) -> AsyncIterator[Client]: @@ -452,8 +451,8 @@ async def fixture_registry_async_pair( yield [server.endpoint.to_url() for server in endpoints] -@pytest.fixture(scope="function", name="registry_cluster") -async def fixture_registry_cluster( +@asynccontextmanager +async def _registry_cluster( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, @@ -480,8 +479,8 @@ async def fixture_registry_cluster( yield servers[0] -@pytest.fixture(scope="function", name="registry_async_client") -async def fixture_registry_async_client( +@asynccontextmanager +async def _registry_async_client( request: SubRequest, registry_cluster: RegistryDescription, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument @@ -507,6 +506,20 @@ async def fixture_registry_async_client( await client.close() +@pytest.fixture(scope="function", name="registry_async_client") +async def fixture_registry_async_client( + request: SubRequest, + registry_cluster: RegistryDescription, + loop: asyncio.AbstractEventLoop, +) -> Client: + async with _registry_async_client( + request, + registry_cluster, + loop, + ) as client: + yield client + + @pytest.fixture(scope="function", name="credentials_folder") def fixture_credentials_folder() -> str: integration_test_folder = os.path.dirname(__file__) diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index dc551dad0..de1c1476e 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -8,6 +8,8 @@ from kafka.errors import UnknownTopicOrPartitionError from karapace.client import Client from karapace.kafka_rest_apis import KafkaRest, KafkaRestAdminClient, SUBJECT_VALID_POSTFIX +from karapace.schema_models import ValidatedTypedSchema +from karapace.schema_type import SchemaType from karapace.version import __version__ from pytest import raises from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES @@ -651,6 +653,64 @@ async def test_publish_with_schema_id_of_another_subject_novalidation( assert res.status_code == 200 +async def test_can_produce_anything_with_no_validation_policy( + rest_async_client: Client, + registry_async_client: Client, + admin_client: KafkaRestAdminClient, +) -> None: + topic_name = new_topic(admin_client) + + await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + + typed_schema = ValidatedTypedSchema.parse( + SchemaType.AVRO, + json.dumps( + { + "type": "record", + "name": "Schema1", + "fields": [ + { + "name": "name", + "type": "string", + }, + ], + } + ), + ) + + res = await registry_async_client.post(f"/topic/{topic_name}/disable_validation", json={}) + assert res.ok + + # with the no_validation strategy we can produce even if we use a totally random subject name + create_messages_url = f"/topics/{topic_name}" + + res = await registry_async_client.post( + "subjects/random_subject_name/versions", + json={"schema": str(typed_schema)}, + ) + assert res.status_code == 200 + random_subject_name_id = res.json()["id"] + + res = await rest_async_client.post( + create_messages_url, + json={"value_schema_id": random_subject_name_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 200 + + # but if we enable it again and we wait for the cache to be freed we should get an error again + res = await registry_async_client.post(f"/topic/{topic_name}/enable_validation", json={}) + assert res.ok + + # wait for the cache + res = await rest_async_client.post( + create_messages_url, + json={"value_schema_id": random_subject_name_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + + async def test_brokers(rest_async_client: Client) -> None: res = await rest_async_client.get("/brokers") assert res.ok diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index 31c06e4bd..495a4292c 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -4,12 +4,12 @@ """ from contextlib import asynccontextmanager, ExitStack from dataclasses import dataclass -from karapace.config import Config, set_config_defaults, write_config +from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config from pathlib import Path from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.utils import new_random_name, popen_karapace_all -from typing import AsyncIterator, List +from typing import AsyncIterator, List, Optional @dataclass(frozen=True) @@ -33,6 +33,7 @@ async def start_schema_registry_cluster( config_templates: List[Config], data_dir: Path, port_range: PortRangeInclusive, + custom_values: Optional[ConfigDefaults] = None, ) -> AsyncIterator[List[RegistryDescription]]: """Start a cluster of schema registries, one process per `config_templates`.""" for template in config_templates: @@ -76,7 +77,14 @@ async def start_schema_registry_cluster( log_path = group_dir / f"{pos}.log" error_path = group_dir / f"{pos}.error" - config = set_config_defaults(config) + config = ( + set_config_defaults(config) + if custom_values is None + else { + **set_config_defaults(config), + **custom_values, + } + ) write_config(config_path, config) logfile = stack.enter_context(open(log_path, "w")) diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 3b3cc1356..bc6019951 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -5,7 +5,7 @@ from aiohttp.test_utils import TestClient, TestServer from karapace.config import DEFAULTS, set_config_defaults from karapace.rapu import HTTPResponse -from karapace.schema_registry_apis import KarapaceSchemaRegistryController +from karapace.schema_registry_apis import compute_forwarded_url, KarapaceSchemaRegistryController from unittest.mock import ANY, Mock, patch, PropertyMock import asyncio @@ -52,3 +52,13 @@ async def test_forward_when_not_ready(): mock_forward_func.assert_called_once_with( request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" ) + + +def test_compute_forwarded_url() -> None: + assert ( + compute_forwarded_url( + master_url="http://localhost:8081/another/fancy/path", + request_url="https://docs.python.org/3/library/urllib.parse.html", + ) + == "http://localhost:8081/3/library/urllib.parse.html" + ) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 2e09fec82..d2d62258d 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -365,9 +365,9 @@ async def test_deserialization_fails(default_config_path: Path): (Subject("foo-key"), NameStrategy.topic_name, SubjectType.key), (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("foo-value"), NameStrategy.topic_name, SubjectType.value), - (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("foo-value"), NameStrategy.topic_name, SubjectType.value_), + (Subject("io.aiven.data.Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-io.aiven.data.Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_avro(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType): @@ -382,8 +382,8 @@ def test_name_strategy_for_avro(expected_subject: Subject, strategy: NameStrateg ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_json_schema(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType): @@ -398,8 +398,8 @@ def test_name_strategy_for_json_schema(expected_subject: Subject, strategy: Name ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_avro_without_namespace( @@ -418,8 +418,8 @@ def test_name_strategy_for_avro_without_namespace( ( (Subject("Test"), NameStrategy.record_name, SubjectType.key), (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.key), - (Subject("Test"), NameStrategy.record_name, SubjectType.value), - (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value), + (Subject("Test"), NameStrategy.record_name, SubjectType.value_), + (Subject("foo-Test"), NameStrategy.topic_record_name, SubjectType.value_), ), ) def test_name_strategy_for_protobuf(expected_subject: Subject, strategy: NameStrategy, subject_type: SubjectType):