Skip to content

Commit

Permalink
feat: preserve unrecognized logical types and properties (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump authored Oct 11, 2024
1 parent 4966106 commit 631f6dd
Show file tree
Hide file tree
Showing 7 changed files with 823 additions and 45 deletions.
3 changes: 0 additions & 3 deletions gen/testdata/golden.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,13 @@
{
"name": "mapOfStrings",
"type": {
"name": "aMapOfStrings",
"type": "map",
"values": "string"
}
},
{
"name": "mapOfRecords",
"type": {
"name": "aMapOfRecords",
"type": "map",
"values": {
"name": "RecordInMap",
Expand Down Expand Up @@ -175,7 +173,6 @@
{
"name": "aRecordArray",
"type": {
"name": "someRecordArray",
"type": "array",
"items": {
"name": "recordInArray",
Expand Down
1 change: 1 addition & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ func TestWithSchemaMarshaler(t *testing.T) {
"name": "meta",
"type": {
"type": "array",
"logicalType": "map",
"items": {
"type": "record",
"name": "FooMetadataEntry",
Expand Down
3 changes: 2 additions & 1 deletion ocf/testdata/full-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"field-id": 5
}
]
}
},
"logicalType": "map"
},
"field-id": 3
}
Expand Down
73 changes: 60 additions & 13 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@ func (nullDefaultType) MarshalJSON() ([]byte, error) {
var nullDefault nullDefaultType = struct{}{}

var (
schemaReserved = []string{
"doc", "fields", "items", "name", "namespace", "size", "symbols",
"values", "type", "aliases", "logicalType", "precision", "scale",
}
fieldReserved = []string{"default", "doc", "name", "order", "type", "aliases"}
// Note: order matches the order of properties as they are named in the spec.
// https://avro.apache.org/docs/1.12.0/specification
recordReserved = []string{"type", "name", "namespace", "doc", "aliases", "fields"}
fieldReserved = []string{"name", "doc", "type", "order", "aliases", "default"}
enumReserved = []string{"type", "name", "namespace", "aliases", "doc", "symbols", "default"}
arrayReserved = []string{"type", "items"}
mapReserved = []string{"type", "values"}
fixedReserved = []string{"type", "name", "namespace", "aliases", "size"}
fixedWithLogicalTypeReserved = []string{"type", "name", "namespace", "aliases", "size", "logicalType"}
fixedWithDecimalTypeReserved = []string{
"type", "name", "namespace", "aliases", "size", "logicalType", "precision", "scale",
}
primitiveReserved = []string{"type"}
primitiveWithLogicalTypeReserved = []string{"type", "logicalType"}
primitiveWithDecimalTypeReserved = []string{"type", "logicalType", "precision", "scale"}
)

// Type is a schema type.
Expand Down Expand Up @@ -482,9 +492,16 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv
for _, opt := range opts {
opt(&cfg)
}

reservedProps := primitiveReserved
if l != nil {
if l.Type() == Decimal {
reservedProps = primitiveWithDecimalTypeReserved
} else {
reservedProps = primitiveWithLogicalTypeReserved
}
}
return &PrimitiveSchema{
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, reservedProps),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
typ: t,
logical: l,
Expand Down Expand Up @@ -574,7 +591,7 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti

return &RecordSchema{
name: n,
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, recordReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
fields: fields,
doc: cfg.doc,
Expand Down Expand Up @@ -919,7 +936,7 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio

return &EnumSchema{
name: n,
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, enumReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
symbols: symbols,
def: def,
Expand Down Expand Up @@ -1072,7 +1089,7 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema {
}

return &ArraySchema{
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, arrayReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
items: items,
}
Expand Down Expand Up @@ -1142,7 +1159,7 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema {
}

return &MapSchema{
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, mapReserved),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
values: values,
}
Expand Down Expand Up @@ -1323,9 +1340,17 @@ func NewFixedSchema(
return nil, err
}

reservedProps := fixedReserved
if logical != nil {
if logical.Type() == Decimal {
reservedProps = fixedWithDecimalTypeReserved
} else {
reservedProps = fixedWithLogicalTypeReserved
}
}
return &FixedSchema{
name: n,
properties: newProperties(cfg.props, schemaReserved),
properties: newProperties(cfg.props, reservedProps),
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
size: size,
logical: logical,
Expand Down Expand Up @@ -1406,9 +1431,22 @@ func (s *FixedSchema) CacheFingerprint() [32]byte {

// NullSchema is an Avro null type schema.
type NullSchema struct {
properties
fingerprinter
}

// NewNullSchema creates a new NullSchema.
func NewNullSchema(opts ...SchemaOption) *NullSchema {
var cfg schemaConfig
for _, opt := range opts {
opt(&cfg)
}

return &NullSchema{
properties: newProperties(cfg.props, primitiveReserved),
}
}

// Type returns the type of the schema.
func (s *NullSchema) Type() Type {
return Null
Expand All @@ -1421,7 +1459,16 @@ func (s *NullSchema) String() string {

// MarshalJSON marshals the schema to json.
func (s *NullSchema) MarshalJSON() ([]byte, error) {
return []byte(`"null"`), nil
if len(s.props) == 0 {
return []byte(`"null"`), nil
}
buf := new(bytes.Buffer)
buf.WriteString(`{"type":"null"`)
if err := s.marshalPropertiesToJSON(buf); err != nil {
return nil, err
}
buf.WriteString("}")
return buf.Bytes(), nil
}

// Fingerprint returns the SHA256 fingerprint of the schema.
Expand Down
4 changes: 4 additions & 0 deletions schema_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestSchema_JSON(t *testing.T) {
input: `{"type":"null"}`,
json: `"null"`,
},
{
input: `{"type":"null","other":"foo"}`,
json: `{"type":"null","other":"foo"}`,
},
{
input: `"boolean"`,
json: `"boolean"`,
Expand Down
97 changes: 69 additions & 28 deletions schema_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func parsePrimitiveType(namespace, s string, cache *SchemaCache) (Schema, error)

func parseComplexType(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
if val, ok := m["type"].([]any); ok {
// Note: According to the spec, this is not allowed:
// https://avro.apache.org/docs/1.12.0/specification/#schema-declaration
// The "type" property in an object must be a string. A union type will be a slice,
// but NOT an object with a "type" property that is a slice.
// Might be advisable to remove this call (tradeoff between better conformance
// with the spec vs. possible backwards-compatibility issue).
return parseUnion(namespace, val, seen, cache)
}

Expand All @@ -131,10 +137,7 @@ func parseComplexType(namespace string, m map[string]any, seen seenCache, cache
typ := Type(str)

switch typ {
case Null:
return &NullSchema{}, nil

case String, Bytes, Int, Long, Float, Double, Boolean:
case String, Bytes, Int, Long, Float, Double, Boolean, Null:
return parsePrimitive(typ, m)

case Record, Error:
Expand All @@ -158,14 +161,15 @@ func parseComplexType(namespace string, m map[string]any, seen seenCache, cache
}

type primitiveSchema struct {
LogicalType string `mapstructure:"logicalType"`
Precision int `mapstructure:"precision"`
Scale int `mapstructure:"scale"`
Props map[string]any `mapstructure:",remain"`
Type string `mapstructure:"type"`
Props map[string]any `mapstructure:",remain"`
}

func parsePrimitive(typ Type, m map[string]any) (Schema, error) {
if m == nil {
if len(m) == 0 {
if typ == Null {
return &NullSchema{}, nil
}
return NewPrimitiveSchema(typ, nil), nil
}

Expand All @@ -178,14 +182,20 @@ func parsePrimitive(typ Type, m map[string]any) (Schema, error) {
}

var logical LogicalSchema
if p.LogicalType != "" {
logical = parsePrimitiveLogicalType(typ, p.LogicalType, p.Precision, p.Scale)
if logicalType := logicalTypeProperty(p.Props); logicalType != "" {
logical = parsePrimitiveLogicalType(typ, logicalType, p.Props)
if logical != nil {
delete(p.Props, "logicalType")
}
}

if typ == Null {
return NewNullSchema(WithProps(p.Props)), nil
}
return NewPrimitiveSchema(typ, logical, WithProps(p.Props)), nil
}

func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSchema {
func parsePrimitiveLogicalType(typ Type, lt string, props map[string]any) LogicalSchema {
ltyp := LogicalType(lt)
if (typ == String && ltyp == UUID) ||
(typ == Int && ltyp == Date) ||
Expand All @@ -199,10 +209,10 @@ func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSche
}

if typ == Bytes && ltyp == Decimal {
return parseDecimalLogicalType(-1, prec, scale)
return parseDecimalLogicalType(-1, props)
}

return nil
return nil // otherwise, not a recognized logical type
}

type recordSchema struct {
Expand Down Expand Up @@ -368,6 +378,7 @@ func parseEnum(namespace string, m map[string]any, seen seenCache, cache *Schema
}

type arraySchema struct {
Type string `mapstructure:"type"`
Items any `mapstructure:"items"`
Props map[string]any `mapstructure:",remain"`
}
Expand All @@ -393,6 +404,7 @@ func parseArray(namespace string, m map[string]any, seen seenCache, cache *Schem
}

type mapSchema struct {
Type string `mapstructure:"type"`
Values any `mapstructure:"values"`
Props map[string]any `mapstructure:",remain"`
}
Expand Down Expand Up @@ -431,15 +443,12 @@ func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (
}

type fixedSchema struct {
Name string `mapstructure:"name"`
Namespace string `mapstructure:"namespace"`
Aliases []string `mapstructure:"aliases"`
Type string `mapstructure:"type"`
Size int `mapstructure:"size"`
LogicalType string `mapstructure:"logicalType"`
Precision int `mapstructure:"precision"`
Scale int `mapstructure:"scale"`
Props map[string]any `mapstructure:",remain"`
Name string `mapstructure:"name"`
Namespace string `mapstructure:"namespace"`
Aliases []string `mapstructure:"aliases"`
Type string `mapstructure:"type"`
Size int `mapstructure:"size"`
Props map[string]any `mapstructure:",remain"`
}

func parseFixed(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
Expand All @@ -463,8 +472,11 @@ func parseFixed(namespace string, m map[string]any, seen seenCache, cache *Schem
}

var logical LogicalSchema
if f.LogicalType != "" {
logical = parseFixedLogicalType(f.Size, f.LogicalType, f.Precision, f.Scale)
if logicalType := logicalTypeProperty(f.Props); logicalType != "" {
logical = parseFixedLogicalType(f.Size, logicalType, f.Props)
if logical != nil {
delete(f.Props, "logicalType")
}
}

fixed, err := NewFixedSchema(f.Name, f.Namespace, f.Size, logical, WithAliases(f.Aliases), WithProps(f.Props))
Expand All @@ -485,19 +497,41 @@ func parseFixed(namespace string, m map[string]any, seen seenCache, cache *Schem
return fixed, nil
}

func parseFixedLogicalType(size int, lt string, prec, scale int) LogicalSchema {
func parseFixedLogicalType(size int, lt string, props map[string]any) LogicalSchema {
ltyp := LogicalType(lt)
switch {
case ltyp == Duration && size == 12:
return NewPrimitiveLogicalSchema(Duration)
case ltyp == Decimal:
return parseDecimalLogicalType(size, prec, scale)
return parseDecimalLogicalType(size, props)
}

return nil
}

func parseDecimalLogicalType(size, prec, scale int) LogicalSchema {
type decimalSchema struct {
Precision int `mapstructure:"precision"`
Scale int `mapstructure:"scale"`
}

func parseDecimalLogicalType(size int, props map[string]any) LogicalSchema {
var (
d decimalSchema
meta mapstructure.Metadata
)
if err := decodeMap(props, &d, &meta); err != nil {
return nil
}
decType := newDecimalLogicalType(size, d.Precision, d.Scale)
if decType != nil {
// Remove the properties that we consumed
delete(props, "precision")
delete(props, "scale")
}
return decType
}

func newDecimalLogicalType(size, prec, scale int) LogicalSchema {
if prec <= 0 {
return nil
}
Expand Down Expand Up @@ -594,3 +628,10 @@ func (c seenCache) Add(name string) error {
c[name] = struct{}{}
return nil
}

func logicalTypeProperty(props map[string]any) string {
if lt, ok := props["logicalType"].(string); ok {
return lt
}
return ""
}
Loading

0 comments on commit 631f6dd

Please sign in to comment.