Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve OCF encoder/decoder handling of dynamic types #467

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import (
"bytes"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -20,10 +21,11 @@
codecKey = "avro.codec"
)

var magicBytes = [4]byte{'O', 'b', 'j', 1}
var (
magicBytes = [4]byte{'O', 'b', 'j', 1}

// HeaderSchema is the Avro schema of a container file header.
var HeaderSchema = avro.MustParse(`{
// HeaderSchema is the Avro schema of a container file header.
HeaderSchema = avro.MustParse(`{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
Expand All @@ -33,6 +35,15 @@
]
}`)

// DefaultSchemaMarshaler calls the schema's String() method, to produce
// a "canonical" schema.
DefaultSchemaMarshaler = defaultMarshalSchema
// FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
// a schema with all details preserved. The "canonical" schema returned by
// the default marshaler does not preserve a type's extra properties.
FullSchemaMarshaler = fullMarshalSchema
)

// Header represents an Avro container file header.
type Header struct {
Magic [4]byte `avro:"magic"`
Expand All @@ -42,6 +53,7 @@

type decoderConfig struct {
DecoderConfig avro.API
SchemaCache *avro.SchemaCache
}

// DecoderFunc represents a configuration function for Decoder.
Expand All @@ -54,13 +66,22 @@
}
}

// WithDecoderSchemaCache sets the schema cache for the decoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
return func(cfg *decoderConfig) {
cfg.SchemaCache = cache
}
}

// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
resetReader *bytesx.ResetReader
decoder *avro.Decoder
meta map[string][]byte
sync [16]byte
schema avro.Schema

codec Codec

Expand All @@ -71,14 +92,15 @@
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
}
Expand All @@ -92,6 +114,7 @@
meta: h.Meta,
sync: h.Sync,
codec: h.Codec,
schema: h.Schema,
}, nil
}

Expand All @@ -100,6 +123,10 @@
return d.meta
}

func (d *Decoder) Schema() avro.Schema {

Check failure on line 126 in ocf/ocf.go

View workflow job for this annotation

GitHub Actions / test (1.23)

exported: exported method Decoder.Schema should have comment or be unexported (revive)
return d.schema
}

// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
Expand Down Expand Up @@ -174,6 +201,8 @@
Metadata map[string][]byte
Sync [16]byte
EncodingConfig avro.API
SchemaCache *avro.SchemaCache
SchemaMarshaler func(avro.Schema) ([]byte, error)
}

// EncoderFunc represents a configuration function for Encoder.
Expand Down Expand Up @@ -209,6 +238,22 @@
}
}

// WithEncoderSchemaCache sets the schema cache for the encoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaCache = cache
}
}

// WithSchemaMarshaler sets the schema marshaler for the encoder.
// If not specified, defaults to DefaultSchemaMarshaler.
func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaMarshaler = m
}
}

// WithSyncBlock sets the sync block.
func WithSyncBlock(sync [16]byte) EncoderFunc {
return func(cfg *encoderConfig) {
Expand Down Expand Up @@ -241,12 +286,36 @@
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return newEncoder(
func(cache *avro.SchemaCache) (avro.Schema, error) {
nrwiersma marked this conversation as resolved.
Show resolved Hide resolved
return avro.ParseWithCache(s, "", cache)
},
w,
opts...)
}

// NewEncoderWithSchema returns a new encoder that writes to w using schema s.
//
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return newEncoder(
func(_ *avro.SchemaCache) (avro.Schema, error) {
return schema, nil
},
w,
opts...)
}

func newEncoder(getSchema func(*avro.SchemaCache) (avro.Schema, error), w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
SchemaMarshaler: DefaultSchemaMarshaler,
}
for _, opt := range opts {
opt(&cfg)
Expand All @@ -263,7 +332,7 @@

if info.Size() > 0 {
reader := avro.NewReader(file, 1024)
h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, err
}
Expand All @@ -285,12 +354,16 @@
}
}

schema, err := avro.Parse(s)
schema, err := getSchema(cfg.SchemaCache)
if err != nil {
return nil, err
}
schemaJSON, err := cfg.SchemaMarshaler(schema)
if err != nil {
return nil, err
}

cfg.Metadata[schemaKey] = []byte(schema.String())
cfg.Metadata[schemaKey] = schemaJSON
cfg.Metadata[codecKey] = []byte(cfg.CodecName)
header := Header{
Magic: magicBytes,
Expand Down Expand Up @@ -400,7 +473,7 @@
Sync [16]byte
}

func readHeader(reader *avro.Reader) (*ocfHeader, error) {
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
Expand All @@ -410,7 +483,7 @@
if h.Magic != magicBytes {
return nil, errors.New("invalid avro file")
}
schema, err := avro.Parse(string(h.Meta[schemaKey]))
schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,3 +520,11 @@
}
}
}

func defaultMarshalSchema(schema avro.Schema) ([]byte, error) {
return []byte(schema.String()), nil
}

func fullMarshalSchema(schema avro.Schema) ([]byte, error) {
return json.Marshal(schema)
}
Loading
Loading