Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve OCF encoder/decoder handling of dynamic types #467

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 95 additions & 18 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ocf
import (
"bytes"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -20,10 +21,11 @@ const (
codecKey = "avro.codec"
)

var magicBytes = [4]byte{'O', 'b', 'j', 1}
var (
magicBytes = [4]byte{'O', 'b', 'j', 1}

// HeaderSchema is the Avro schema of a container file header.
var HeaderSchema = avro.MustParse(`{
// HeaderSchema is the Avro schema of a container file header.
HeaderSchema = avro.MustParse(`{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
Expand All @@ -33,6 +35,15 @@ var HeaderSchema = avro.MustParse(`{
]
}`)

// DefaultSchemaMarshaler calls the schema's String() method, to produce
// a "canonical" schema.
DefaultSchemaMarshaler = defaultMarshalSchema
// FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
// a schema with all details preserved. The "canonical" schema returned by
// the default marshaler does not preserve a type's extra properties.
FullSchemaMarshaler = fullMarshalSchema
)

// Header represents an Avro container file header.
type Header struct {
Magic [4]byte `avro:"magic"`
Expand All @@ -42,6 +53,7 @@ type Header struct {

type decoderConfig struct {
DecoderConfig avro.API
SchemaCache *avro.SchemaCache
}

// DecoderFunc represents a configuration function for Decoder.
Expand All @@ -54,13 +66,22 @@ func WithDecoderConfig(wCfg avro.API) DecoderFunc {
}
}

// WithDecoderSchemaCache sets the schema cache for the decoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
return func(cfg *decoderConfig) {
cfg.SchemaCache = cache
}
}

// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
resetReader *bytesx.ResetReader
decoder *avro.Decoder
meta map[string][]byte
sync [16]byte
schema avro.Schema

codec Codec

Expand All @@ -71,14 +92,15 @@ type Decoder struct {
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
}
Expand All @@ -92,6 +114,7 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
meta: h.Meta,
sync: h.Sync,
codec: h.Codec,
schema: h.Schema,
}, nil
}

Expand All @@ -100,6 +123,12 @@ func (d *Decoder) Metadata() map[string][]byte {
return d.meta
}

// Schema returns the schema that was parsed from the file's metadata
// and that is used to interpret the file's contents.
func (d *Decoder) Schema() avro.Schema {
return d.schema
}

// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
Expand Down Expand Up @@ -174,6 +203,8 @@ type encoderConfig struct {
Metadata map[string][]byte
Sync [16]byte
EncodingConfig avro.API
SchemaCache *avro.SchemaCache
SchemaMarshaler func(avro.Schema) ([]byte, error)
}

// EncoderFunc represents a configuration function for Encoder.
Expand Down Expand Up @@ -209,6 +240,22 @@ func WithMetadata(meta map[string][]byte) EncoderFunc {
}
}

// WithEncoderSchemaCache sets the schema cache for the encoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaCache = cache
}
}

// WithSchemaMarshaler sets the schema marshaler for the encoder.
// If not specified, defaults to DefaultSchemaMarshaler.
func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaMarshaler = m
}
}

// WithSyncBlock sets the sync block.
func WithSyncBlock(sync [16]byte) EncoderFunc {
return func(cfg *encoderConfig) {
Expand Down Expand Up @@ -241,17 +288,23 @@ type Encoder struct {
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
}
for _, opt := range opts {
opt(&cfg)
cfg := computeEncoderConfig(opts)
schema, err := avro.ParseWithCache(s, "", cfg.SchemaCache)
if err != nil {
return nil, err
}
return newEncoder(schema, w, cfg)
}

// NewEncoderWithSchema returns a new encoder that writes to w using schema s.
//
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return newEncoder(schema, w, computeEncoderConfig(opts))
}

func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, error) {
switch file := w.(type) {
case nil:
return nil, errors.New("writer cannot be nil")
Expand All @@ -263,7 +316,7 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {

if info.Size() > 0 {
reader := avro.NewReader(file, 1024)
h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, err
}
Expand All @@ -285,12 +338,12 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
}
}

schema, err := avro.Parse(s)
schemaJSON, err := cfg.SchemaMarshaler(schema)
if err != nil {
return nil, err
}

cfg.Metadata[schemaKey] = []byte(schema.String())
cfg.Metadata[schemaKey] = schemaJSON
cfg.Metadata[codecKey] = []byte(cfg.CodecName)
header := Header{
Magic: magicBytes,
Expand Down Expand Up @@ -324,6 +377,22 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return e, nil
}

func computeEncoderConfig(opts []EncoderFunc) encoderConfig {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
SchemaMarshaler: DefaultSchemaMarshaler,
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}

// Write v to the internal buffer. This method skips the internal encoder and
// therefore the caller is responsible for encoding the bytes. No error will be
// thrown if the bytes does not conform to the schema given to NewEncoder, but
Expand Down Expand Up @@ -400,7 +469,7 @@ type ocfHeader struct {
Sync [16]byte
}

func readHeader(reader *avro.Reader) (*ocfHeader, error) {
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
Expand All @@ -410,7 +479,7 @@ func readHeader(reader *avro.Reader) (*ocfHeader, error) {
if h.Magic != magicBytes {
return nil, errors.New("invalid avro file")
}
schema, err := avro.Parse(string(h.Meta[schemaKey]))
schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,3 +516,11 @@ func skipToEnd(reader *avro.Reader, sync [16]byte) error {
}
}
}

func defaultMarshalSchema(schema avro.Schema) ([]byte, error) {
return []byte(schema.String()), nil
}

func fullMarshalSchema(schema avro.Schema) ([]byte, error) {
return json.Marshal(schema)
}
Loading
Loading