Skip to content

Commit

Permalink
feat: make schema tests parallel (#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored May 6, 2024
1 parent 9b663e1 commit 0e4c8f9
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 54 deletions.
26 changes: 15 additions & 11 deletions pkg/crc64/crc64.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ package crc64

import (
"hash"
"sync"
)

func init() {
buildTable()
}

// Size is the of a CRC-64 checksum in bytes.
const Size = 8

Expand All @@ -28,14 +31,7 @@ func makeTable() *Table {
return t
}

var (
tableBuildOnce sync.Once
crc64Table *Table
)

func buildTableOnce() {
tableBuildOnce.Do(buildTable)
}
var crc64Table *Table

func buildTable() {
crc64Table = makeTable()
Expand All @@ -49,8 +45,6 @@ type digest struct {
// New creates a new hash.Hash64 computing the Avro CRC-64 checksum.
// Its Sum method will lay the value out in big-endian byte order.
func New() hash.Hash64 {
buildTableOnce()

return &digest{
crc: Empty,
tab: crc64Table,
Expand Down Expand Up @@ -91,3 +85,13 @@ func (d *digest) Sum(in []byte) []byte {
s := d.Sum64()
return append(in, byte(s>>56), byte(s>>48), byte(s>>40), byte(s>>32), byte(s>>24), byte(s>>16), byte(s>>8), byte(s))
}

// Sum returns the MD5 checksum of the data.
func Sum(data []byte) [Size]byte {
d := digest{crc: Empty, tab: crc64Table}
d.Reset()
_, _ = d.Write(data)
s := d.Sum64()
//nolint:lll
return [Size]byte{byte(s >> 56), byte(s >> 48), byte(s >> 40), byte(s >> 32), byte(s >> 24), byte(s >> 16), byte(s >> 8), byte(s)}
}
26 changes: 14 additions & 12 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/sha256"
"errors"
"fmt"
"hash"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -96,12 +95,6 @@ const (
SHA256 FingerprintType = "SHA256"
)

var fingerprinters = map[FingerprintType]hash.Hash{
CRC64Avro: crc64.New(),
MD5: md5.New(),
SHA256: sha256.New(),
}

// SchemaCache is a cache of schemas.
type SchemaCache struct {
cache sync.Map // map[string]Schema
Expand Down Expand Up @@ -290,14 +283,23 @@ func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Strin
return v.([]byte), nil
}

h, ok := fingerprinters[typ]
if !ok {
data := []byte(stringer.String())

var fingerprint []byte
switch typ {
case CRC64Avro:
h := crc64.Sum(data)
fingerprint = h[:]
case MD5:
h := md5.Sum(data)
fingerprint = h[:]
case SHA256:
h := sha256.Sum256(data)
fingerprint = h[:]
default:
return nil, fmt.Errorf("avro: unknown fingerprint algorithm %s", typ)
}

h.Reset()
_, _ = h.Write([]byte(stringer.String()))
fingerprint := h.Sum(make([]byte, 0, h.Size()))
f.cache.Store(typ, fingerprint)
return fingerprint, nil
}
Expand Down
4 changes: 3 additions & 1 deletion schema_canonical_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ func TestSchema_Canonical(t *testing.T) {
for i, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
s, err := avro.Parse(test.input)
t.Parallel()

s, err := avro.ParseWithCache(test.input, "", &avro.SchemaCache{})

require.NoError(t, err)
assert.Equal(t, test.canonical, s.String())
Expand Down
19 changes: 14 additions & 5 deletions schema_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/hamba/avro/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewSchemaCompatibility(t *testing.T) {
Expand Down Expand Up @@ -243,11 +244,15 @@ func TestSchemaCompatibility_Compatible(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
r := avro.MustParse(test.reader)
w := avro.MustParse(test.writer)
t.Parallel()

r, err := avro.ParseWithCache(test.reader, "", &avro.SchemaCache{})
require.NoError(t, err)
w, err := avro.ParseWithCache(test.writer, "", &avro.SchemaCache{})
require.NoError(t, err)
sc := avro.NewSchemaCompatibility()

err := sc.Compatible(r, w)
err = sc.Compatible(r, w)

test.wantErr(t, err)
})
Expand Down Expand Up @@ -808,8 +813,12 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
r := avro.MustParse(test.reader)
w := avro.MustParse(test.writer)
t.Parallel()

r, err := avro.ParseWithCache(test.reader, "", &avro.SchemaCache{})
require.NoError(t, err)
w, err := avro.ParseWithCache(test.writer, "", &avro.SchemaCache{})
require.NoError(t, err)
sc := avro.NewSchemaCompatibility()

b, err := avro.Marshal(w, test.value)
Expand Down
8 changes: 8 additions & 0 deletions schema_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ func TestIsValidDefault(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()

got, ok := isValidDefault(test.schemaFn(), test.def)

assert.Equal(t, test.wantOk, ok)
Expand Down Expand Up @@ -451,6 +453,8 @@ func TestSchema_IsPromotable(t *testing.T) {
for i, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

ok := isPromotable(test.writerTyp, test.readerType)

assert.Equal(t, test.want, ok)
Expand Down Expand Up @@ -526,6 +530,8 @@ func TestSchema_IsNative(t *testing.T) {
for i, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

ok := isNative(test.typ)
assert.Equal(t, test.wantOk, ok)
})
Expand Down Expand Up @@ -632,6 +638,8 @@ func TestEnumSchema_GetSymbol(t *testing.T) {
for i, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

got, ok := test.schemaFn().Symbol(test.idx)
assert.Equal(t, test.wantOk, ok)
if ok {
Expand Down
6 changes: 4 additions & 2 deletions schema_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ func TestSchema_JSON(t *testing.T) {
for i, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
s, err := avro.Parse(test.input)
t.Parallel()

schema, err := avro.ParseWithCache(test.input, "", &avro.SchemaCache{})
require.NoError(t, err)

b, err := json.Marshal(s)
b, err := json.Marshal(schema)

require.NoError(t, err)
assert.Equal(t, test.json, string(b))
Expand Down
73 changes: 50 additions & 23 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ func TestPrimitiveSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.schema, func(t *testing.T) {
s, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

require.NoError(t, err)
assert.Equal(t, test.want, s.Type())
assert.Equal(t, test.wantFingerprint, s.Fingerprint())
assert.Equal(t, test.want, schema.Type())
assert.Equal(t, test.wantFingerprint, schema.Fingerprint())
})
}
}
Expand Down Expand Up @@ -257,11 +259,13 @@ func TestRecordSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
s, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if s != nil {
assert.Equal(t, avro.Record, s.Type())
if schema != nil {
assert.Equal(t, avro.Record, schema.Type())
}
})
}
Expand Down Expand Up @@ -315,13 +319,14 @@ func TestErrorRecordSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()

s, err := avro.Parse(test.schema)
schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if test.wantSchema {
assert.Equal(t, avro.Record, s.Type())
recSchema := s.(*avro.RecordSchema)
assert.Equal(t, avro.Record, schema.Type())
recSchema := schema.(*avro.RecordSchema)
assert.True(t, recSchema.IsError())
}
})
Expand Down Expand Up @@ -429,7 +434,9 @@ func TestRecordSchema_ValidatesDefault(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
_, err := avro.Parse(test.schema)
t.Parallel()

_, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
})
Expand Down Expand Up @@ -477,11 +484,13 @@ func TestRecordSchema_ValidatesOrder(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
s, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if test.want != "" {
rs := s.(*avro.RecordSchema)
rs := schema.(*avro.RecordSchema)
require.Len(t, rs.Fields(), 1)
assert.Equal(t, test.want, rs.Fields()[0].Order())
}
Expand Down Expand Up @@ -666,7 +675,9 @@ func TestEnumSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
schema, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if test.wantName != "" {
Expand Down Expand Up @@ -719,7 +730,9 @@ func TestArraySchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := avro.Parse(test.schema)
t.Parallel()

got, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
assert.Equal(t, test.want, got)
Expand Down Expand Up @@ -765,7 +778,9 @@ func TestMapSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := avro.Parse(test.schema)
t.Parallel()

got, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
assert.Equal(t, test.want, got)
Expand Down Expand Up @@ -827,12 +842,14 @@ func TestUnionSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
s, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if test.wantFingerprint != [32]byte{} {
assert.Equal(t, avro.Union, s.Type())
assert.Equal(t, test.wantFingerprint, s.Fingerprint())
assert.Equal(t, avro.Union, schema.Type())
assert.Equal(t, test.wantFingerprint, schema.Fingerprint())
}
})
}
Expand Down Expand Up @@ -864,10 +881,12 @@ func TestUnionSchema_Indices(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
s, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

require.NoError(t, err)
null, typ := s.(*avro.UnionSchema).Indices()
null, typ := schema.(*avro.UnionSchema).Indices()
assert.Equal(t, test.want[0], null)
assert.Equal(t, test.want[1], typ)
})
Expand Down Expand Up @@ -929,7 +948,9 @@ func TestFixedSchema(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
schema, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})

test.wantErr(t, err)
if test.wantFingerprint != [32]byte{} {
Expand Down Expand Up @@ -1129,9 +1150,11 @@ func TestSchema_LogicalTypes(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
schema, err := avro.Parse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})
require.NoError(t, err)

assert.Equal(t, test.wantType, schema.Type())

lts, ok := schema.(avro.LogicalTypeSchema)
Expand Down Expand Up @@ -1228,7 +1251,11 @@ func TestSchema_FingerprintUsing(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
schema := avro.MustParse(test.schema)
t.Parallel()

schema, err := avro.ParseWithCache(test.schema, "", &avro.SchemaCache{})
require.NoError(t, err)

got, err := schema.FingerprintUsing(test.typ)

require.NoError(t, err)
Expand Down

0 comments on commit 0e4c8f9

Please sign in to comment.