From 0f0aad60e4e242a30aa2db6b52d90da95f6cd9d4 Mon Sep 17 00:00:00 2001 From: Jonas Weile Date: Tue, 10 Sep 2024 11:11:44 +0000 Subject: [PATCH 1/5] add custom logical types --- config_internal_test.go | 27 +++++++++ schema.go | 128 +++++++++++++++++++++++++++++++++++++--- schema_parse.go | 71 ++++++++++++---------- schema_test.go | 47 +++++++++++++++ 4 files changed, 233 insertions(+), 40 deletions(-) diff --git a/config_internal_test.go b/config_internal_test.go index 4d14625..1ccf5ec 100644 --- a/config_internal_test.go +++ b/config_internal_test.go @@ -182,3 +182,30 @@ func TestConfig_DisableCache_DoesNotReuseEncoders(t *testing.T) { assert.NotSame(t, enc1, enc2) } + +func TestConfig_RegisterCustomLogicalType(t *testing.T) { + type testObj struct { + A int64 `avro:"a"` + } + + api := Config{ + TagKey: "test", + BlockLength: 2, + DisableCaching: true, + }.Freeze() + cfg := api.(*frozenConfig) + + schema := MustParse(`{ + "type": "record", + "name": "test", + "fields" : [ + {"name": "a", "type": "long"} + ] +}`) + typ := reflect2.TypeOfPtr(testObj{}) + + enc1 := cfg.EncoderOf(schema, typ) + enc2 := cfg.EncoderOf(schema, typ) + + assert.NotSame(t, enc1, enc2) +} diff --git a/schema.go b/schema.go index d2b93df..ccc4010 100644 --- a/schema.go +++ b/schema.go @@ -76,6 +76,52 @@ const ( Duration LogicalType = "duration" ) +// customLogicalSchema is a custom logical type schema that is not part of the Avro specification. +// It wraps a primitive type schema and thus supports no additional properties. +type customLogicalSchema struct { + PrimitiveLogicalSchema +} + +var customLogicalSchemas sync.Map // map[LogicalType]*CustomLogicalSchema + +func addCustomLogicalSchema(ltyp LogicalType) { + customLogicalSchemas.Store(ltyp, &customLogicalSchema{ + PrimitiveLogicalSchema: PrimitiveLogicalSchema{typ: ltyp}, + }) +} + +func getCustomLogicalSchema(ltyp LogicalType) LogicalSchema { + if ls, ok := customLogicalSchemas.Load(ltyp); ok { + return ls.(*customLogicalSchema) + } + return nil +} + +func RegisterCustomLogicalType(ltyp LogicalType) error { + // Ensure that the custom logical type does not overwrite a primitive type + switch ltyp { + case Decimal, + UUID, + Date, + TimeMillis, + TimeMicros, + TimestampMillis, + TimestampMicros, + LocalTimestampMillis, + LocalTimestampMicros, + Duration: + return errors.New("logical type conflicts with a predefined logical type") + } + + // Ensure that the custom logical schema has not already been registered + if ls := getCustomLogicalSchema(ltyp); ls != nil { + return errors.New("logical type has already been registered") + } + + addCustomLogicalSchema(ltyp) + return nil +} + // Action is a field action used during decoding process. type Action string @@ -396,12 +442,13 @@ func (p properties) marshalPropertiesToJSON(buf *bytes.Buffer) error { } type schemaConfig struct { - aliases []string - doc string - def any - order Order - props map[string]any - wfp *[32]byte + aliases []string + doc string + def any + order Order + props map[string]any + wfp *[32]byte + customLogicalSchema LogicalSchema } // SchemaOption is a function that sets a schema option. @@ -414,6 +461,13 @@ func WithAliases(aliases []string) SchemaOption { } } +// WithCustomLogicalType sets the logical type on a schema. +func WithCustomLogicalType(ltyp LogicalType) SchemaOption { + return func(opts *schemaConfig) { + opts.customLogicalSchema = getCustomLogicalSchema(ltyp) + } +} + // WithDoc sets the doc on a schema. func WithDoc(doc string) SchemaOption { return func(opts *schemaConfig) { @@ -477,6 +531,11 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv opt(&cfg) } + // If the logical schema is nil, use the custom logical schema. + if l == nil { + l = cfg.customLogicalSchema + } + return &PrimitiveSchema{ properties: newProperties(cfg.props, schemaReserved), cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, @@ -552,6 +611,7 @@ type RecordSchema struct { isError bool fields []*Field doc string + logical LogicalSchema } // NewRecordSchema creates a new record schema instance. @@ -572,6 +632,7 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, fields: fields, doc: cfg.doc, + logical: cfg.customLogicalSchema, }, nil } @@ -592,6 +653,10 @@ func (s *RecordSchema) Type() Type { return Record } +func (s *RecordSchema) Logical() LogicalSchema { + return s.logical +} + // Doc returns the documentation of a record. func (s *RecordSchema) Doc() string { return s.doc @@ -622,6 +687,10 @@ func (s *RecordSchema) String() string { fields = fields[:len(fields)-1] } + if s.logical != nil { + return `{"name":"` + s.FullName() + `","type":"` + typ + `,"fields":[` + fields + `]` + `",` + s.logical.String() + `}` + } + return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` } @@ -659,6 +728,9 @@ func (s *RecordSchema) MarshalJSON() ([]byte, error) { if err := s.marshalPropertiesToJSON(buf); err != nil { return nil, err } + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + } buf.WriteString("}") return buf.Bytes(), nil } @@ -876,6 +948,7 @@ type EnumSchema struct { symbols []string def string doc string + logical LogicalSchema // encodedSymbols is the symbols of the encoded value. // It's only used in the context of write-read schema resolution. @@ -918,6 +991,7 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio symbols: symbols, def: def, doc: cfg.doc, + logical: cfg.customLogicalSchema, }, nil } @@ -979,6 +1053,11 @@ func (s *EnumSchema) HasDefault() bool { return s.def != "" } +// Logical returns the logical schema or nil. +func (s *EnumSchema) Logical() LogicalSchema { + return s.logical +} + // String returns the canonical form of the schema. func (s *EnumSchema) String() string { symbols := "" @@ -989,6 +1068,10 @@ func (s *EnumSchema) String() string { symbols = symbols[:len(symbols)-1] } + if s.logical != nil { + return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]` + `",` + s.logical.String() + `}` + } + return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}` } @@ -1025,6 +1108,9 @@ func (s *EnumSchema) MarshalJSON() ([]byte, error) { if err := s.marshalPropertiesToJSON(buf); err != nil { return nil, err } + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + } buf.WriteString("}") return buf.Bytes(), nil } @@ -1055,7 +1141,8 @@ type ArraySchema struct { fingerprinter cacheFingerprinter - items Schema + items Schema + logical LogicalSchema } // NewArraySchema creates an array schema instance. @@ -1069,6 +1156,7 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema { properties: newProperties(cfg.props, schemaReserved), cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, items: items, + logical: cfg.customLogicalSchema, } } @@ -1082,8 +1170,16 @@ func (s *ArraySchema) Items() Schema { return s.items } +// Logical returns the logical schema or nil. +func (s *ArraySchema) Logical() LogicalSchema { + return s.logical +} + // String returns the canonical form of the schema. func (s *ArraySchema) String() string { + if s.logical != nil { + return `{"type":"array","items":` + s.items.String() + `,"` + s.logical.String() + `"}` + } return `{"type":"array","items":` + s.items.String() + `}` } @@ -1100,6 +1196,9 @@ func (s *ArraySchema) MarshalJSON() ([]byte, error) { if err = s.marshalPropertiesToJSON(buf); err != nil { return nil, err } + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + } buf.WriteString("}") return buf.Bytes(), nil } @@ -1125,7 +1224,8 @@ type MapSchema struct { fingerprinter cacheFingerprinter - values Schema + values Schema + logical LogicalSchema } // NewMapSchema creates a map schema instance. @@ -1139,6 +1239,7 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema { properties: newProperties(cfg.props, schemaReserved), cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, values: values, + logical: cfg.customLogicalSchema, } } @@ -1152,8 +1253,16 @@ func (s *MapSchema) Values() Schema { return s.values } +// Logical returns the logical schema or nil. +func (s *MapSchema) Logical() LogicalSchema { + return s.logical +} + // String returns the canonical form of the schema. func (s *MapSchema) String() string { + if s.logical != nil { + return `{"type":"map","values":` + s.values.String() + `,"` + s.logical.String() + `"}` + } return `{"type":"map","values":` + s.values.String() + `}` } @@ -1170,6 +1279,9 @@ func (s *MapSchema) MarshalJSON() ([]byte, error) { if err := s.marshalPropertiesToJSON(buf); err != nil { return nil, err } + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + } buf.WriteString("}") return buf.Bytes(), nil } diff --git a/schema_parse.go b/schema_parse.go index 3022116..3874cfe 100644 --- a/schema_parse.go +++ b/schema_parse.go @@ -202,17 +202,18 @@ func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSche return parseDecimalLogicalType(-1, prec, scale) } - return nil + return getCustomLogicalSchema(ltyp) } type recordSchema struct { - Type string `mapstructure:"type"` - Name string `mapstructure:"name"` - Namespace string `mapstructure:"namespace"` - Aliases []string `mapstructure:"aliases"` - Doc string `mapstructure:"doc"` - Fields []map[string]any `mapstructure:"fields"` - Props map[string]any `mapstructure:",remain"` + Type string `mapstructure:"type"` + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Doc string `mapstructure:"doc"` + Fields []map[string]any `mapstructure:"fields"` + LogicalType string `mapstructure:"logicalType"` + Props map[string]any `mapstructure:",remain"` } func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { @@ -243,11 +244,11 @@ func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, c switch typ { case Record: rec, err = NewRecordSchema(r.Name, r.Namespace, fields, - WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), WithCustomLogicalType(LogicalType(r.LogicalType)), ) case Error: rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields, - WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), WithCustomLogicalType(LogicalType(r.LogicalType)), ) } if err != nil { @@ -276,13 +277,14 @@ func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, c } type fieldSchema struct { - Name string `mapstructure:"name"` - Aliases []string `mapstructure:"aliases"` - Type any `mapstructure:"type"` - Doc string `mapstructure:"doc"` - Default any `mapstructure:"default"` - Order Order `mapstructure:"order"` - Props map[string]any `mapstructure:",remain"` + Name string `mapstructure:"name"` + Aliases []string `mapstructure:"aliases"` + Type any `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Default any `mapstructure:"default"` + Order Order `mapstructure:"order"` + LogicalType string `mapstructure:"logicalType"` + Props map[string]any `mapstructure:",remain"` } func parseField(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Field, error) { @@ -312,6 +314,7 @@ func parseField(namespace string, m map[string]any, seen seenCache, cache *Schem field, err := NewField(f.Name, typ, WithDefault(f.Default), WithAliases(f.Aliases), WithDoc(f.Doc), WithOrder(f.Order), WithProps(f.Props), + WithCustomLogicalType(LogicalType(f.LogicalType)), ) if err != nil { return nil, err @@ -321,14 +324,15 @@ func parseField(namespace string, m map[string]any, seen seenCache, cache *Schem } type enumSchema struct { - Name string `mapstructure:"name"` - Namespace string `mapstructure:"namespace"` - Aliases []string `mapstructure:"aliases"` - Type string `mapstructure:"type"` - Doc string `mapstructure:"doc"` - Symbols []string `mapstructure:"symbols"` - Default string `mapstructure:"default"` - Props map[string]any `mapstructure:",remain"` + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Type string `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Symbols []string `mapstructure:"symbols"` + Default string `mapstructure:"default"` + LogicalType string `mapstructure:"logicalType"` + Props map[string]any `mapstructure:",remain"` } func parseEnum(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { @@ -349,6 +353,7 @@ func parseEnum(namespace string, m map[string]any, seen seenCache, cache *Schema enum, err := NewEnumSchema(e.Name, e.Namespace, e.Symbols, WithDefault(e.Default), WithAliases(e.Aliases), WithDoc(e.Doc), WithProps(e.Props), + WithCustomLogicalType(LogicalType(e.LogicalType)), ) if err != nil { return nil, err @@ -368,8 +373,9 @@ func parseEnum(namespace string, m map[string]any, seen seenCache, cache *Schema } type arraySchema struct { - Items any `mapstructure:"items"` - Props map[string]any `mapstructure:",remain"` + Items any `mapstructure:"items"` + LogicalType string `mapstructure:"logicalType"` + Props map[string]any `mapstructure:",remain"` } func parseArray(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { @@ -389,12 +395,13 @@ func parseArray(namespace string, m map[string]any, seen seenCache, cache *Schem return nil, err } - return NewArraySchema(schema, WithProps(a.Props)), nil + return NewArraySchema(schema, WithProps(a.Props), WithCustomLogicalType(LogicalType(a.LogicalType))), nil } type mapSchema struct { - Values any `mapstructure:"values"` - Props map[string]any `mapstructure:",remain"` + Values any `mapstructure:"values"` + LogicalType string `mapstructure:"logicalType"` + Props map[string]any `mapstructure:",remain"` } func parseMap(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { @@ -414,7 +421,7 @@ func parseMap(namespace string, m map[string]any, seen seenCache, cache *SchemaC return nil, err } - return NewMapSchema(schema, WithProps(ms.Props)), nil + return NewMapSchema(schema, WithProps(ms.Props), WithCustomLogicalType(LogicalType(ms.LogicalType))), nil } func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (Schema, error) { @@ -494,7 +501,7 @@ func parseFixedLogicalType(size int, lt string, prec, scale int) LogicalSchema { return parseDecimalLogicalType(size, prec, scale) } - return nil + return getCustomLogicalSchema(ltyp) } func parseDecimalLogicalType(size, prec, scale int) LogicalSchema { diff --git a/schema_test.go b/schema_test.go index 1fa1461..440f126 100644 --- a/schema_test.go +++ b/schema_test.go @@ -983,6 +983,18 @@ func TestFixedSchema_HandlesProps(t *testing.T) { } func TestSchema_LogicalTypes(t *testing.T) { + customType := avro.LogicalType("customType") + err := avro.RegisterCustomLogicalType(customType) + require.NoError(t, err) + + // Should not be able to register the same type twice + err = avro.RegisterCustomLogicalType(customType) + require.Error(t, err) + + // should not be able to register a type with the same name as a built-in type + err = avro.RegisterCustomLogicalType(avro.Date) + require.Error(t, err) + tests := []struct { name string schema string @@ -997,6 +1009,41 @@ func TestSchema_LogicalTypes(t *testing.T) { wantType: avro.Int, wantLogical: false, }, + { + name: "Primitive Custom Type", + schema: `{"type": "int", "logicalType": "customType"}`, + wantType: avro.Int, + wantLogical: true, + wantLogicalType: customType, + }, + { + name: "Enum Custom Type", + schema: `{"type":"enum", "name":"test", "namespace": "org.hamba.avro", "symbols":["TEST"], "logicalType": "customType"}`, + wantType: avro.Enum, + wantLogical: true, + wantLogicalType: customType, + }, + { + name: "Array Custom Type", + schema: `{"type":"array", "items": "int", "logicalType": "customType"}`, + wantType: avro.Array, + wantLogical: true, + wantLogicalType: customType, + }, + { + name: "Map Custom Type", + schema: `{"type":"map", "values": "int", "logicalType": "customType"}`, + wantType: avro.Map, + wantLogical: true, + wantLogicalType: customType, + }, + { + name: "Record Custom Type", + schema: `{"type": "record", "name": "Foo", "fields": [{"name": "baz", "type": "string"}], "logicalType": "customType"}`, + wantType: avro.Record, + wantLogical: true, + wantLogicalType: customType, + }, { name: "Date", schema: `{"type": "int", "logicalType": "date"}`, From dc85aa401811d9aa988b7f3599e2306bde17bd94 Mon Sep 17 00:00:00 2001 From: Jonas Weile Date: Tue, 10 Sep 2024 11:21:41 +0000 Subject: [PATCH 2/5] docstrings --- schema.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/schema.go b/schema.go index ccc4010..796d6a6 100644 --- a/schema.go +++ b/schema.go @@ -97,6 +97,9 @@ func getCustomLogicalSchema(ltyp LogicalType) LogicalSchema { return nil } +// RegisterCustomLogicalType registers a custom logical type that is not part of the Avro specification. +// It returns an error if the logical type conflicts with a predefined logical type or if the logical +// type has already been registered. func RegisterCustomLogicalType(ltyp LogicalType) error { // Ensure that the custom logical type does not overwrite a primitive type switch ltyp { @@ -461,7 +464,10 @@ func WithAliases(aliases []string) SchemaOption { } } -// WithCustomLogicalType sets the logical type on a schema. +// WithCustomLogicalType sets a custom logical type on a schema. +// Make sure to register the custom logical type before using it, +// otherwise it will be ignored. +// See RegisterCustomLogicalType. func WithCustomLogicalType(ltyp LogicalType) SchemaOption { return func(opts *schemaConfig) { opts.customLogicalSchema = getCustomLogicalSchema(ltyp) @@ -653,6 +659,7 @@ func (s *RecordSchema) Type() Type { return Record } +// Logical returns the logical schema or nil. func (s *RecordSchema) Logical() LogicalSchema { return s.logical } From c47366d62d10c4e398bad6b89a3eee8487a8bf97 Mon Sep 17 00:00:00 2001 From: Jonas Weile Date: Tue, 10 Sep 2024 12:05:28 +0000 Subject: [PATCH 3/5] remove accidental addition --- config_internal_test.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/config_internal_test.go b/config_internal_test.go index 1ccf5ec..4d14625 100644 --- a/config_internal_test.go +++ b/config_internal_test.go @@ -182,30 +182,3 @@ func TestConfig_DisableCache_DoesNotReuseEncoders(t *testing.T) { assert.NotSame(t, enc1, enc2) } - -func TestConfig_RegisterCustomLogicalType(t *testing.T) { - type testObj struct { - A int64 `avro:"a"` - } - - api := Config{ - TagKey: "test", - BlockLength: 2, - DisableCaching: true, - }.Freeze() - cfg := api.(*frozenConfig) - - schema := MustParse(`{ - "type": "record", - "name": "test", - "fields" : [ - {"name": "a", "type": "long"} - ] -}`) - typ := reflect2.TypeOfPtr(testObj{}) - - enc1 := cfg.EncoderOf(schema, typ) - enc2 := cfg.EncoderOf(schema, typ) - - assert.NotSame(t, enc1, enc2) -} From d3b8ce47f773475a95d7c76cfc73e6a5520031c2 Mon Sep 17 00:00:00 2001 From: Jonas Weile Date: Wed, 11 Sep 2024 08:33:03 +0000 Subject: [PATCH 4/5] make custom logical types type specific --- schema.go | 66 ++++++++++++++++++++++++++++++------------------- schema_parse.go | 4 +-- schema_test.go | 14 ++++++----- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/schema.go b/schema.go index 796d6a6..cb1af31 100644 --- a/schema.go +++ b/schema.go @@ -82,25 +82,32 @@ type customLogicalSchema struct { PrimitiveLogicalSchema } -var customLogicalSchemas sync.Map // map[LogicalType]*CustomLogicalSchema +type customSchemaKey = struct { + typ Type + ltyp LogicalType +} + +var customLogicalSchemas sync.Map // map[customSchemaKey]*CustomLogicalSchema -func addCustomLogicalSchema(ltyp LogicalType) { - customLogicalSchemas.Store(ltyp, &customLogicalSchema{ +func addCustomLogicalSchema(typ Type, ltyp LogicalType) { + key := customSchemaKey{typ, ltyp} + customLogicalSchemas.Store(key, &customLogicalSchema{ PrimitiveLogicalSchema: PrimitiveLogicalSchema{typ: ltyp}, }) } -func getCustomLogicalSchema(ltyp LogicalType) LogicalSchema { - if ls, ok := customLogicalSchemas.Load(ltyp); ok { +func getCustomLogicalSchema(typ Type, ltyp LogicalType) LogicalSchema { + key := customSchemaKey{typ, ltyp} + if ls, ok := customLogicalSchemas.Load(key); ok { return ls.(*customLogicalSchema) } return nil } -// RegisterCustomLogicalType registers a custom logical type that is not part of the Avro specification. -// It returns an error if the logical type conflicts with a predefined logical type or if the logical -// type has already been registered. -func RegisterCustomLogicalType(ltyp LogicalType) error { +// RegisterCustomLogicalType registers a custom logical type that is not part of the Avro specification +// for the given types. +// It returns an error if the logical type conflicts with a predefined logical type. +func RegisterCustomLogicalType(ltyp LogicalType, types ...Type) error { // Ensure that the custom logical type does not overwrite a primitive type switch ltyp { case Decimal, @@ -116,12 +123,19 @@ func RegisterCustomLogicalType(ltyp LogicalType) error { return errors.New("logical type conflicts with a predefined logical type") } - // Ensure that the custom logical schema has not already been registered - if ls := getCustomLogicalSchema(ltyp); ls != nil { - return errors.New("logical type has already been registered") + // Check that all of the given type supports logical types + for _, typ := range types { + switch typ { + case Ref, Union, Null: + return fmt.Errorf("type %q does not support logical types", typ) + } + } + + // Register the custom logical type + for _, typ := range types { + addCustomLogicalSchema(typ, ltyp) } - addCustomLogicalSchema(ltyp) return nil } @@ -445,13 +459,13 @@ func (p properties) marshalPropertiesToJSON(buf *bytes.Buffer) error { } type schemaConfig struct { - aliases []string - doc string - def any - order Order - props map[string]any - wfp *[32]byte - customLogicalSchema LogicalSchema + aliases []string + doc string + def any + order Order + props map[string]any + wfp *[32]byte + customLogicalType LogicalType } // SchemaOption is a function that sets a schema option. @@ -470,7 +484,7 @@ func WithAliases(aliases []string) SchemaOption { // See RegisterCustomLogicalType. func WithCustomLogicalType(ltyp LogicalType) SchemaOption { return func(opts *schemaConfig) { - opts.customLogicalSchema = getCustomLogicalSchema(ltyp) + opts.customLogicalType = ltyp } } @@ -539,7 +553,7 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv // If the logical schema is nil, use the custom logical schema. if l == nil { - l = cfg.customLogicalSchema + l = getCustomLogicalSchema(t, cfg.customLogicalType) } return &PrimitiveSchema{ @@ -638,7 +652,7 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, fields: fields, doc: cfg.doc, - logical: cfg.customLogicalSchema, + logical: getCustomLogicalSchema(Record, cfg.customLogicalType), }, nil } @@ -998,7 +1012,7 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio symbols: symbols, def: def, doc: cfg.doc, - logical: cfg.customLogicalSchema, + logical: getCustomLogicalSchema(Enum, cfg.customLogicalType), }, nil } @@ -1163,7 +1177,7 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema { properties: newProperties(cfg.props, schemaReserved), cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, items: items, - logical: cfg.customLogicalSchema, + logical: getCustomLogicalSchema(Array, cfg.customLogicalType), } } @@ -1246,7 +1260,7 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema { properties: newProperties(cfg.props, schemaReserved), cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, values: values, - logical: cfg.customLogicalSchema, + logical: getCustomLogicalSchema(Map, cfg.customLogicalType), } } diff --git a/schema_parse.go b/schema_parse.go index 3874cfe..0c39f14 100644 --- a/schema_parse.go +++ b/schema_parse.go @@ -202,7 +202,7 @@ func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSche return parseDecimalLogicalType(-1, prec, scale) } - return getCustomLogicalSchema(ltyp) + return getCustomLogicalSchema(typ, ltyp) } type recordSchema struct { @@ -501,7 +501,7 @@ func parseFixedLogicalType(size int, lt string, prec, scale int) LogicalSchema { return parseDecimalLogicalType(size, prec, scale) } - return getCustomLogicalSchema(ltyp) + return getCustomLogicalSchema(Fixed, ltyp) } func parseDecimalLogicalType(size, prec, scale int) LogicalSchema { diff --git a/schema_test.go b/schema_test.go index 440f126..b5c2481 100644 --- a/schema_test.go +++ b/schema_test.go @@ -984,15 +984,11 @@ func TestFixedSchema_HandlesProps(t *testing.T) { func TestSchema_LogicalTypes(t *testing.T) { customType := avro.LogicalType("customType") - err := avro.RegisterCustomLogicalType(customType) + err := avro.RegisterCustomLogicalType(customType, avro.Int, avro.Enum, avro.Array, avro.Map, avro.Record) require.NoError(t, err) - // Should not be able to register the same type twice - err = avro.RegisterCustomLogicalType(customType) - require.Error(t, err) - // should not be able to register a type with the same name as a built-in type - err = avro.RegisterCustomLogicalType(avro.Date) + err = avro.RegisterCustomLogicalType(avro.Date, avro.Double) require.Error(t, err) tests := []struct { @@ -1009,6 +1005,12 @@ func TestSchema_LogicalTypes(t *testing.T) { wantType: avro.Int, wantLogical: false, }, + { + name: "Invalid", + schema: `{"type": "long", "logicalType": "customType"}`, + wantType: avro.Long, + wantLogical: false, + }, { name: "Primitive Custom Type", schema: `{"type": "int", "logicalType": "customType"}`, From 3786f0a3cc6c9f3cf54cfe3efb23d2e8b1b9c608 Mon Sep 17 00:00:00 2001 From: Jonas Weile Date: Mon, 16 Sep 2024 07:44:36 +0000 Subject: [PATCH 5/5] linting error --- schema.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/schema.go b/schema.go index cb1af31..ecd11c9 100644 --- a/schema.go +++ b/schema.go @@ -104,8 +104,8 @@ func getCustomLogicalSchema(typ Type, ltyp LogicalType) LogicalSchema { return nil } -// RegisterCustomLogicalType registers a custom logical type that is not part of the Avro specification -// for the given types. +// RegisterCustomLogicalType registers a custom logical type that is not part of the +// Avro specification for the given types. // It returns an error if the logical type conflicts with a predefined logical type. func RegisterCustomLogicalType(ltyp LogicalType, types ...Type) error { // Ensure that the custom logical type does not overwrite a primitive type @@ -709,7 +709,9 @@ func (s *RecordSchema) String() string { } if s.logical != nil { - return `{"name":"` + s.FullName() + `","type":"` + typ + `,"fields":[` + fields + `]` + `",` + s.logical.String() + `}` + return fmt.Sprintf("{\"name\":\"%s\", \"type\":\"%s\", \"fields\":[%s]\", %s}", + s.FullName(), typ, fields, s.logical.String(), + ) } return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` @@ -1090,7 +1092,8 @@ func (s *EnumSchema) String() string { } if s.logical != nil { - return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]` + `",` + s.logical.String() + `}` + return fmt.Sprintf("{\"name\":\"%s\", \"type\":\"enum\", \"symbols\":[%s]\", %s}", + s.FullName(), symbols, s.logical.String()) } return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`