diff --git a/karapace/avro_compatibility.py b/karapace/avro_compatibility.py index 70e192709..8fcbb8e52 100644 --- a/karapace/avro_compatibility.py +++ b/karapace/avro_compatibility.py @@ -1,5 +1,4 @@ -from avro.io import Validate # type: ignore -from avro.schema import ( # type: ignore +from avro.schema import ( ARRAY, ArraySchema, BOOLEAN, BYTES, DOUBLE, ENUM, EnumSchema, Field, FIXED, FixedSchema, FLOAT, INT, LONG, MAP, MapSchema, NamedSchema, Names, NULL, RECORD, RecordSchema, Schema, SchemaFromJSONData, STRING, UNION, UnionSchema ) @@ -27,80 +26,8 @@ def parse_avro_schema_definition(s: str) -> Schema: json_data = json.loads(s[:e.pos]) - schema = SchemaFromJSONData(json_data, Names()) - - validate_schema_defaults(schema) - return schema - - -class ValidateSchemaDefaultsException(Exception): - def __init__(self, schema: Schema, default: Any, path: List[str]) -> None: - prefix = ": ".join(path) - msg = f"{prefix}: default {default} does not match schema {schema.to_json()}" - super().__init__(msg) - - -def validate_schema_defaults(schema: Schema) -> None: - """ This function validates that the defaults that are defined in the schema actually - match their schema. We try to build a proper error message internally and then throw it - to the user as a readable exception message. - - Consider for example the schema: {'type': 'enum', 'symbols': ['A','B'], 'default':'C'} - """ - - def _validate_field_default(f: Field, acc: List[str]) -> None: - """This function validates that the default for a field matches the field type - - From the Avro docs: (Note that when a default value is specified for a record - field whose type is a union, the type of the default value must match the first - element of the union. Thus, for unions containing "null", the "null" is usually - listed first, since the default value of such unions is typically null.) - """ - - if not f.has_default: - return - - if isinstance(f.type, UnionSchema): - # select the first schema from the union to validate, if its empty just - # raise an exception because no default should match anyway - if len(f.type.schemas) == 0: - # if the union is empty; no default should match - raise ValidateSchemaDefaultsException(f.type, f.default, acc) - s = f.type.schemas[0] - else: - s = f.type - - if not Validate(s, f.default): - raise ValidateSchemaDefaultsException(f.type, f.default, acc) - - def _validation_crumb(s: Schema) -> str: - if hasattr(s, 'name'): - return f"bad {s.type} '{s.name}'" - return f"bad {s.type}" - - def _validate_schema_defaults(s: Schema, acc: List[str]) -> None: - _acc = [*acc, _validation_crumb(s)] - - if "default" in s.props: - default = s.props.get("default") - if not Validate(s, default): - raise ValidateSchemaDefaultsException(s, default, _acc) - - if isinstance(s, RecordSchema): - # fields do need to be unwrapped - for f in s.fields: - _field_acc = [*_acc, f"bad field: '{f.name}'"] - _validate_schema_defaults(f.type, _field_acc) - _validate_field_default(f, _field_acc) - if isinstance(s, ArraySchema): - _validate_schema_defaults(s.items, _acc) - if isinstance(s, MapSchema): - _validate_schema_defaults(s.values, _acc) - if isinstance(s, UnionSchema): - for u in s.schemas: - _validate_schema_defaults(u, _acc) - - _validate_schema_defaults(schema, []) + names = Names() + return SchemaFromJSONData(json_data, names) def is_incompatible(result: "SchemaCompatibilityResult") -> bool: diff --git a/karapace/compatibility/__init__.py b/karapace/compatibility/__init__.py index 2b760d4d1..058565b14 100644 --- a/karapace/compatibility/__init__.py +++ b/karapace/compatibility/__init__.py @@ -4,9 +4,9 @@ Copyright (c) 2019 Aiven Ltd See LICENSE for details """ -from avro.schema import Schema as AvroSchema # type: ignore +from avro.schema import Schema as AvroSchema from enum import Enum, unique -from jsonschema import Draft7Validator # type: ignore +from jsonschema import Draft7Validator from karapace.avro_compatibility import ( ReaderWriterCompatibilityChecker as AvroChecker, SchemaCompatibilityResult, SchemaCompatibilityType, SchemaIncompatibilityType diff --git a/karapace/serialization.py b/karapace/serialization.py index 0c0d79ee2..bff12edc5 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -3,14 +3,12 @@ from jsonschema import ValidationError from karapace.schema_reader import InvalidSchema, SchemaType, TypedSchema from karapace.utils import Client, json_encode -from typing import Any, Dict, Optional +from typing import Dict, Optional from urllib.parse import quote import asyncio import avro -import avro.schema import io -import json import logging import struct @@ -174,85 +172,10 @@ async def get_schema_for_id(self, schema_id: int) -> TypedSchema: return schema_typed -def flatten_unions(schema: avro.schema.Schema, value: Any) -> Any: - """Recursively flattens unions to convert Avro JSON payloads to internal dictionaries - - Data encoded to Avro JSON has a special case for union types, values of these type are encoded as tagged union. - The additional tag is not expected to be in the internal data format and has to be removed before further processing. - This means the JSON document must be further processed to remove the tag, this function does just that, - recursing over the JSON document and handling the tagged unions. To avoid dropping invalid fields here we - also do schema validation in this step. - - See also https://avro.apache.org/docs/current/spec.html#json_encoding - """ - - # this exception is raised when we run into invalid schemas during recursion. - validation_exception = InvalidMessageSchema(f"{json.dumps(value)} is no instance of {schema.to_json()}") - - def _flatten_unions(ss: avro.schema.Schema, vv: Any) -> Any: - if isinstance(ss, avro.schema.RecordSchema): - # expect value to look like {'f.name': val_that_matches_f, ..} - if not (isinstance(vv, dict) and {f.name for f in ss.fields}.issuperset(vv.keys())): - raise validation_exception - return {f.name: _flatten_unions(f.type, vv.get(f.name)) for f in ss.fields} - if isinstance(ss, avro.schema.UnionSchema): - # expect value to look like {'union_type': union_val} or None - if vv is None: - if "null" not in (s.name for s in ss.schemas): - raise validation_exception - return vv - if isinstance(vv, dict): - f = next((s for s in ss.schemas if s.name in vv), None) - if not f: - raise validation_exception - return _flatten_unions(f, vv[f.name]) - raise validation_exception - if isinstance(ss, avro.schema.ArraySchema): - # expect value to look like [ val_that_matches_schema, .... ] - if not isinstance(vv, list): - raise validation_exception - return [_flatten_unions(ss.items, v) for v in vv] - if isinstance(ss, avro.schema.MapSchema): - # expect value to look like { k: val_that_matches_schema, .... } - if not (isinstance(vv, dict) and all(isinstance(k, str) for k in vv)): - raise validation_exception - return {k: _flatten_unions(ss.values, v) for (k, v) in vv.items()} - # schema is not recursive, validate it directly - if not avro.io.Validate(ss, vv): - raise validation_exception - return vv - - return _flatten_unions(schema, value) - - -def unflatten_unions(schema: avro.schema.Schema, value: Any) -> Any: - """Reverse 'flatten_unions' to convert internal dictionaries into Avro JSON payloads. - - This method performs the reverse operation of 'flatten_unions' and adds the name of the first matching schema - in a union as a dictionary key to make it a tagged union again. The data we receive in this step is already - validated becaues we have already parsed it from bytes to the internal dictionary. No further validation is - needed here. - - See also https://avro.apache.org/docs/current/spec.html#json_encoding - """ - if isinstance(schema, avro.schema.RecordSchema): - return {f.name: unflatten_unions(f.type, value.get(f.name)) for f in schema.fields} - if isinstance(schema, avro.schema.UnionSchema): - if value is None: - return value - f = next(s for s in schema.schemas if avro.io.Validate(s, value)) - return {f.name: unflatten_unions(f, value)} - if isinstance(schema, avro.schema.ArraySchema): - return [unflatten_unions(schema.items, v) for v in value] - if isinstance(schema, avro.schema.MapSchema): - return {k: unflatten_unions(schema.values, v) for (k, v) in value.items()} - return value - - def read_value(schema: TypedSchema, bio: io.BytesIO): if schema.schema_type is SchemaType.AVRO: reader = DatumReader(schema.schema) - return unflatten_unions(schema.schema, reader.read(BinaryDecoder(bio))) + return reader.read(BinaryDecoder(bio)) if schema.schema_type is SchemaType.JSONSCHEMA: value = load(bio) try: @@ -266,7 +189,7 @@ def read_value(schema: TypedSchema, bio: io.BytesIO): def write_value(schema: TypedSchema, bio: io.BytesIO, value: dict): if schema.schema_type is SchemaType.AVRO: writer = DatumWriter(schema.schema) - writer.write(flatten_unions(schema.schema, value), BinaryEncoder(bio)) + writer.write(value, BinaryEncoder(bio)) elif schema.schema_type is SchemaType.JSONSCHEMA: try: schema.schema.validate(value) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 32dbda686..757f65568 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -2165,7 +2165,7 @@ async def test_full_transitive_failure(registry_async_client: Client, compatibil }] } ], - "default": None, + "default": "null" }] } evolved = { @@ -2188,7 +2188,7 @@ async def test_full_transitive_failure(registry_async_client: Client, compatibil }] } ], - "default": None, + "default": "null" }] } await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility}) diff --git a/tests/unit/test_avro_schema.py b/tests/unit/test_avro_schema.py index 110ba9db6..da08fd70c 100644 --- a/tests/unit/test_avro_schema.py +++ b/tests/unit/test_avro_schema.py @@ -4,8 +4,7 @@ """ from avro.schema import ArraySchema, Field, MapSchema, RecordSchema, Schema, UnionSchema from karapace.avro_compatibility import ( - parse_avro_schema_definition, ReaderWriterCompatibilityChecker, SchemaCompatibilityResult, SchemaCompatibilityType, - ValidateSchemaDefaultsException + parse_avro_schema_definition, ReaderWriterCompatibilityChecker, SchemaCompatibilityResult, SchemaCompatibilityType ) import json @@ -33,161 +32,10 @@ '{"type":"record","name":"myrecord","fields":[{"type":"string","name":"f1"},{"type":"string",' '"name":"f2","default":"foo"}]},{"type":"string","name":"f3","default":"bar"}]}' ) - - -def test_schemaregistry_schema_validation() -> None: - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": ["null", "int"], - "default": 1, - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": [], - "default": 1, - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": { - "name": "z", - "type": "enum", - "symbols": ["A"], - "default": "B" - } - }] - }) - ) - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": { - "name": "z", - "type": "enum", - "symbols": ["A"] - }, - "default": "B" - }] - }) - ) - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": { - "name": "z", - "type": "enum", - "symbols": ["A"], - "default": "A" - }, - "default": "B" - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": "string", - "default": 1 - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": "int", - "default": "z" - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition(json.dumps({"type": "enum", "name": "x", "symbols": ["A", "B"], "default": "C"})) - - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": "string", - "default": "z" - }] - }) - ) - - with pytest.raises(ValidateSchemaDefaultsException): - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "x", - "fields": [{ - "name": "y", - "type": ["int", "string", "boolean"], - "default": None, - }] - }) - ) - - parse_avro_schema_definition( - json.dumps({ - "type": "record", - "name": "order", - "namespace": "example", - "fields": [{ - "name": "someField", - "type": [ - "string", { - "type": "record", - "name": "someEmbeddedRecord", - "namespace": "example", - "fields": [{ - "name": "name", - "type": "string" - }] - } - ], - "default": "none", - }] - }) - ) +badDefaultNullString = parse_avro_schema_definition( + '{"type":"record","name":"myrecord","fields":[{"type":["null","string"],"name":"f1","default":' + '"null"},{"type":"string","name":"f2","default":"foo"},{"type":"string","name":"f3","default":"bar"}]}' +) def test_schemaregistry_basic_backwards_compatibility(): diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index 0b0c18b22..146a3c745 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -1,8 +1,7 @@ from karapace.config import read_config -from karapace.schema_reader import SchemaType, TypedSchema from karapace.serialization import ( - HEADER_FORMAT, InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, read_value, SchemaRegistryDeserializer, - SchemaRegistrySerializer, START_BYTE, write_value + HEADER_FORMAT, InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, SchemaRegistryDeserializer, + SchemaRegistrySerializer, START_BYTE ) from tests.utils import test_objects_avro @@ -38,119 +37,6 @@ async def test_happy_flow(default_config_path, mock_registry_client): assert 1 in o.ids_to_schemas -def assert_avro_json_round_trip(schema, record): - typed_schema = TypedSchema.parse(SchemaType.AVRO, json.dumps(schema)) - bio = io.BytesIO() - - write_value(typed_schema, bio, record) - assert record == read_value(typed_schema, io.BytesIO(bio.getvalue())) - - -def test_avro_json_round_trip(): - schema = { - "namespace": "io.aiven.data", - "name": "Test", - "type": "record", - "fields": [{ - "name": "attr1", - "type": ["null", "string"], - }, { - "name": "attr2", - "type": ["null", "string"], - }] - } - record = {"attr1": {"string": "sample data"}, "attr2": None} - assert_avro_json_round_trip(schema, record) - - record = {"attr1": None, "attr2": None} - assert_avro_json_round_trip(schema, record) - - schema = { - "type": "array", - "items": { - "namespace": "io.aiven.data", - "name": "Test", - "type": "record", - "fields": [{ - "name": "attr", - "type": ["null", "string"], - }] - } - } - record = [{"attr": {"string": "sample data"}}] - assert_avro_json_round_trip(schema, record) - record = [{"attr": None}] - assert_avro_json_round_trip(schema, record) - - schema = { - "type": "map", - "values": { - "namespace": "io.aiven.data", - "name": "Test", - "type": "record", - "fields": [{ - "name": "attr1", - "type": ["null", "string"], - }] - } - } - record = {"foo": {"attr1": {"string": "sample data"}}} - assert_avro_json_round_trip(schema, record) - - schema = {"type": "array", "items": ["null", "string", "int"]} - record = [{"string": "foo"}, None, {"int": 1}] - assert_avro_json_round_trip(schema, record) - - -def assert_avro_json_write_invalid(schema, records): - typed_schema = TypedSchema.parse(SchemaType.AVRO, json.dumps(schema)) - bio = io.BytesIO() - - for record in records: - with pytest.raises(InvalidMessageSchema): - write_value(typed_schema, bio, record) - - -def test_avro_json_write_invalid(): - schema = { - "namespace": "io.aiven.data", - "name": "Test", - "type": "record", - "fields": [{ - "name": "attr", - "type": ["null", "string"], - }] - } - records = [ - { - "attr": { - "string": 5 - } - }, - { - "attr": { - "foo": "bar" - } - }, - { - "foo": "bar" - }, - ] - assert_avro_json_write_invalid(schema, records) - - schema = {"type": "array", "items": ["string", "int"]} - records = [ - [{ - "string": 1 - }], - [1, 2], - [{ - "string": 1 - }, 1, "foo"], - ] - assert_avro_json_write_invalid(schema, records) - - async def test_serialization_fails(default_config_path, mock_registry_client): serializer, _ = await make_ser_deser(default_config_path, mock_registry_client) with pytest.raises(InvalidMessageSchema):