Skip to content

Commit

Permalink
Merge pull request #294 from aiven/revert-avro-validation
Browse files Browse the repository at this point in the history
Revert avro validation
  • Loading branch information
tvainika authored Dec 16, 2021
2 parents cefda17 + 922fc65 commit 8b235f0
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 433 deletions.
79 changes: 3 additions & 76 deletions karapace/avro_compatibility.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from avro.io import Validate # type: ignore
from avro.schema import ( # type: ignore
from avro.schema import (
ARRAY, ArraySchema, BOOLEAN, BYTES, DOUBLE, ENUM, EnumSchema, Field, FIXED, FixedSchema, FLOAT, INT, LONG, MAP,
MapSchema, NamedSchema, Names, NULL, RECORD, RecordSchema, Schema, SchemaFromJSONData, STRING, UNION, UnionSchema
)
Expand Down Expand Up @@ -27,80 +26,8 @@ def parse_avro_schema_definition(s: str) -> Schema:

json_data = json.loads(s[:e.pos])

schema = SchemaFromJSONData(json_data, Names())

validate_schema_defaults(schema)
return schema


class ValidateSchemaDefaultsException(Exception):
def __init__(self, schema: Schema, default: Any, path: List[str]) -> None:
prefix = ": ".join(path)
msg = f"{prefix}: default {default} does not match schema {schema.to_json()}"
super().__init__(msg)


def validate_schema_defaults(schema: Schema) -> None:
""" This function validates that the defaults that are defined in the schema actually
match their schema. We try to build a proper error message internally and then throw it
to the user as a readable exception message.
Consider for example the schema: {'type': 'enum', 'symbols': ['A','B'], 'default':'C'}
"""

def _validate_field_default(f: Field, acc: List[str]) -> None:
"""This function validates that the default for a field matches the field type
From the Avro docs: (Note that when a default value is specified for a record
field whose type is a union, the type of the default value must match the first
element of the union. Thus, for unions containing "null", the "null" is usually
listed first, since the default value of such unions is typically null.)
"""

if not f.has_default:
return

if isinstance(f.type, UnionSchema):
# select the first schema from the union to validate, if its empty just
# raise an exception because no default should match anyway
if len(f.type.schemas) == 0:
# if the union is empty; no default should match
raise ValidateSchemaDefaultsException(f.type, f.default, acc)
s = f.type.schemas[0]
else:
s = f.type

if not Validate(s, f.default):
raise ValidateSchemaDefaultsException(f.type, f.default, acc)

def _validation_crumb(s: Schema) -> str:
if hasattr(s, 'name'):
return f"bad {s.type} '{s.name}'"
return f"bad {s.type}"

def _validate_schema_defaults(s: Schema, acc: List[str]) -> None:
_acc = [*acc, _validation_crumb(s)]

if "default" in s.props:
default = s.props.get("default")
if not Validate(s, default):
raise ValidateSchemaDefaultsException(s, default, _acc)

if isinstance(s, RecordSchema):
# fields do need to be unwrapped
for f in s.fields:
_field_acc = [*_acc, f"bad field: '{f.name}'"]
_validate_schema_defaults(f.type, _field_acc)
_validate_field_default(f, _field_acc)
if isinstance(s, ArraySchema):
_validate_schema_defaults(s.items, _acc)
if isinstance(s, MapSchema):
_validate_schema_defaults(s.values, _acc)
if isinstance(s, UnionSchema):
for u in s.schemas:
_validate_schema_defaults(u, _acc)

_validate_schema_defaults(schema, [])
names = Names()
return SchemaFromJSONData(json_data, names)


def is_incompatible(result: "SchemaCompatibilityResult") -> bool:
Expand Down
4 changes: 2 additions & 2 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
Copyright (c) 2019 Aiven Ltd
See LICENSE for details
"""
from avro.schema import Schema as AvroSchema # type: ignore
from avro.schema import Schema as AvroSchema
from enum import Enum, unique
from jsonschema import Draft7Validator # type: ignore
from jsonschema import Draft7Validator
from karapace.avro_compatibility import (
ReaderWriterCompatibilityChecker as AvroChecker, SchemaCompatibilityResult, SchemaCompatibilityType,
SchemaIncompatibilityType
Expand Down
83 changes: 3 additions & 80 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
from jsonschema import ValidationError
from karapace.schema_reader import InvalidSchema, SchemaType, TypedSchema
from karapace.utils import Client, json_encode
from typing import Any, Dict, Optional
from typing import Dict, Optional
from urllib.parse import quote

import asyncio
import avro
import avro.schema
import io
import json
import logging
import struct

Expand Down Expand Up @@ -174,85 +172,10 @@ async def get_schema_for_id(self, schema_id: int) -> TypedSchema:
return schema_typed


def flatten_unions(schema: avro.schema.Schema, value: Any) -> Any:
"""Recursively flattens unions to convert Avro JSON payloads to internal dictionaries
Data encoded to Avro JSON has a special case for union types, values of these type are encoded as tagged union.
The additional tag is not expected to be in the internal data format and has to be removed before further processing.
This means the JSON document must be further processed to remove the tag, this function does just that,
recursing over the JSON document and handling the tagged unions. To avoid dropping invalid fields here we
also do schema validation in this step.
See also https://avro.apache.org/docs/current/spec.html#json_encoding
"""

# this exception is raised when we run into invalid schemas during recursion.
validation_exception = InvalidMessageSchema(f"{json.dumps(value)} is no instance of {schema.to_json()}")

def _flatten_unions(ss: avro.schema.Schema, vv: Any) -> Any:
if isinstance(ss, avro.schema.RecordSchema):
# expect value to look like {'f.name': val_that_matches_f, ..}
if not (isinstance(vv, dict) and {f.name for f in ss.fields}.issuperset(vv.keys())):
raise validation_exception
return {f.name: _flatten_unions(f.type, vv.get(f.name)) for f in ss.fields}
if isinstance(ss, avro.schema.UnionSchema):
# expect value to look like {'union_type': union_val} or None
if vv is None:
if "null" not in (s.name for s in ss.schemas):
raise validation_exception
return vv
if isinstance(vv, dict):
f = next((s for s in ss.schemas if s.name in vv), None)
if not f:
raise validation_exception
return _flatten_unions(f, vv[f.name])
raise validation_exception
if isinstance(ss, avro.schema.ArraySchema):
# expect value to look like [ val_that_matches_schema, .... ]
if not isinstance(vv, list):
raise validation_exception
return [_flatten_unions(ss.items, v) for v in vv]
if isinstance(ss, avro.schema.MapSchema):
# expect value to look like { k: val_that_matches_schema, .... }
if not (isinstance(vv, dict) and all(isinstance(k, str) for k in vv)):
raise validation_exception
return {k: _flatten_unions(ss.values, v) for (k, v) in vv.items()}
# schema is not recursive, validate it directly
if not avro.io.Validate(ss, vv):
raise validation_exception
return vv

return _flatten_unions(schema, value)


def unflatten_unions(schema: avro.schema.Schema, value: Any) -> Any:
"""Reverse 'flatten_unions' to convert internal dictionaries into Avro JSON payloads.
This method performs the reverse operation of 'flatten_unions' and adds the name of the first matching schema
in a union as a dictionary key to make it a tagged union again. The data we receive in this step is already
validated becaues we have already parsed it from bytes to the internal dictionary. No further validation is
needed here.
See also https://avro.apache.org/docs/current/spec.html#json_encoding
"""
if isinstance(schema, avro.schema.RecordSchema):
return {f.name: unflatten_unions(f.type, value.get(f.name)) for f in schema.fields}
if isinstance(schema, avro.schema.UnionSchema):
if value is None:
return value
f = next(s for s in schema.schemas if avro.io.Validate(s, value))
return {f.name: unflatten_unions(f, value)}
if isinstance(schema, avro.schema.ArraySchema):
return [unflatten_unions(schema.items, v) for v in value]
if isinstance(schema, avro.schema.MapSchema):
return {k: unflatten_unions(schema.values, v) for (k, v) in value.items()}
return value


def read_value(schema: TypedSchema, bio: io.BytesIO):
if schema.schema_type is SchemaType.AVRO:
reader = DatumReader(schema.schema)
return unflatten_unions(schema.schema, reader.read(BinaryDecoder(bio)))
return reader.read(BinaryDecoder(bio))
if schema.schema_type is SchemaType.JSONSCHEMA:
value = load(bio)
try:
Expand All @@ -266,7 +189,7 @@ def read_value(schema: TypedSchema, bio: io.BytesIO):
def write_value(schema: TypedSchema, bio: io.BytesIO, value: dict):
if schema.schema_type is SchemaType.AVRO:
writer = DatumWriter(schema.schema)
writer.write(flatten_unions(schema.schema, value), BinaryEncoder(bio))
writer.write(value, BinaryEncoder(bio))
elif schema.schema_type is SchemaType.JSONSCHEMA:
try:
schema.schema.validate(value)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2165,7 +2165,7 @@ async def test_full_transitive_failure(registry_async_client: Client, compatibil
}]
}
],
"default": None,
"default": "null"
}]
}
evolved = {
Expand All @@ -2188,7 +2188,7 @@ async def test_full_transitive_failure(registry_async_client: Client, compatibil
}]
}
],
"default": None,
"default": "null"
}]
}
await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility})
Expand Down
Loading

0 comments on commit 8b235f0

Please sign in to comment.