From cb3d6f5ca4f0525ed8ed91a04b220b8ad6d00be3 Mon Sep 17 00:00:00 2001 From: Reda Laanait Date: Fri, 6 Sep 2024 17:44:32 +0100 Subject: [PATCH] fix: Long timestamp default decoding --- codec.go | 7 +++-- codec_default.go | 27 +++++++++++++++++ codec_native.go | 37 ++++++++++++++++++----- schema_compatibility_test.go | 57 ++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 11 deletions(-) diff --git a/codec.go b/codec.go index 92c0991..fc6706e 100644 --- a/codec.go +++ b/codec.go @@ -11,9 +11,10 @@ import ( ) var ( - timeType = reflect.TypeOf(time.Time{}) - ratType = reflect.TypeOf(big.Rat{}) - durType = reflect.TypeOf(LogicalDuration{}) + timeType = reflect.TypeOf(time.Time{}) + timeDurationType = reflect.TypeOf(time.Duration(0)) + ratType = reflect.TypeOf(big.Rat{}) + durType = reflect.TypeOf(LogicalDuration{}) ) type null struct{} diff --git a/codec_default.go b/codec_default.go index c42bdc3..3155a3f 100644 --- a/codec_default.go +++ b/codec_default.go @@ -2,6 +2,7 @@ package avro import ( "fmt" + "time" "unsafe" "github.com/modern-go/reflect2" @@ -10,6 +11,32 @@ import ( func createDefaultDecoder(d *decoderContext, field *Field, typ reflect2.Type) ValDecoder { cfg := d.cfg fn := func(def any) ([]byte, error) { + // Solution 1: + + // def's Go type is decided by JSON decode. + // Force conversion of some Go types to ensure compatibility with AVRO codec. + switch schema := field.Type().Type(); schema { + case Long: + schema := field.Type().(*PrimitiveSchema) + if schema.Logical() == nil { + break + } + switch schema.Logical().Type() { + case TimestampMillis: + d, ok := def.(int64) + if !ok { + break + } + def = time.UnixMilli(d) + case TimestampMicros: + d, ok := def.(int64) + if !ok { + break + } + def = time.UnixMicro(d) + } + } + defaultType := reflect2.TypeOf(def) if defaultType == nil { defaultType = reflect2.TypeOf((*null)(nil)) diff --git a/codec_native.go b/codec_native.go index e4c5a41..ed097e0 100644 --- a/codec_native.go +++ b/codec_native.go @@ -83,15 +83,27 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode convert: createLongConverter(schema.encodedType), } - case st == Long && lt == "": + // Solution 2: + case st == Long: + timestampLogicalType := (lt == TimestampMillis || lt == TimestampMicros) + if timestampLogicalType && typ.Type1() == timeDurationType { + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + typ.Type1().String(), schema.Type(), lt)} + } if resolved { return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)} } return &longCodec[int64]{} - case lt != "": - return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", - typ.String(), schema.Type(), lt)} + // case st == Long && lt == "": + // if resolved { + // return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)} + // } + // return &longCodec[int64]{} + + // case lt != "": + // return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + // typ.String(), schema.Type(), lt)} default: break @@ -245,12 +257,21 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder { case st == Long && lt == TimeMicros: // time.Duration return &timeMicrosCodec{} - case st == Long && lt == "": + // Solution 2: + case st == Long: + timestampLogicalType := (lt == TimestampMillis || lt == TimestampMicros) + if timestampLogicalType && typ.Type1() == timeDurationType { + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + typ.Type1().String(), schema.Type(), lt)} + } return &longCodec[int64]{} - case lt != "": - return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", - typ.String(), schema.Type(), lt)} + // case st == Long && lt == "": + // return &longCodec[int64]{} + + // case lt != "": + // return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + // typ.String(), schema.Type(), lt)} default: break diff --git a/schema_compatibility_test.go b/schema_compatibility_test.go index 5b4ca13..83485cd 100644 --- a/schema_compatibility_test.go +++ b/schema_compatibility_test.go @@ -2,6 +2,7 @@ package avro_test import ( "math/big" + "strconv" "testing" "time" @@ -815,6 +816,62 @@ func TestSchemaCompatibility_Resolve(t *testing.T) { "b": map[string]any{"a": int64(20)}, }, }, + { + name: "Record Writer Field Missing With Long timestamp-millis Default", + reader: `{ + "type":"record", "name":"test", "namespace": "org.hamba.avro", + "fields":[ + {"name": "a", "type": "string"}, + { + "name": "b", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "default": ` + strconv.FormatInt(1725616800000, 10) + ` + } + ] + }`, + writer: `{ + "type":"record", "name":"test", "namespace": "org.hamba.avro", + "fields":[ + {"name": "a", "type": "string"} + ] + }`, + value: map[string]any{"a": "foo"}, + want: map[string]any{ + "a": "foo", + "b": time.UnixMilli(1725616800000).UTC(), // 2024-09-06 10:00:00 + }, + }, + { + name: "Record Writer Field Missing With Long timestamp-micros Default", + reader: `{ + "type":"record", "name":"test", "namespace": "org.hamba.avro", + "fields":[ + {"name": "a", "type": "string"}, + { + "name": "b", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + }, + "default": ` + strconv.FormatInt(1725616800000000, 10) + ` + } + ] + }`, + writer: `{ + "type":"record", "name":"test", "namespace": "org.hamba.avro", + "fields":[ + {"name": "a", "type": "string"} + ] + }`, + value: map[string]any{"a": "foo"}, + want: map[string]any{ + "a": "foo", + "b": time.UnixMicro(1725616800000000).UTC(), // 2024-09-06 10:00:00 + }, + }, } for _, test := range tests {