Skip to content

Commit

Permalink
Merge pull request #52 from reproio/decoder
Browse files Browse the repository at this point in the history
Implement stream-based input record decoder's instead of batch-based
  • Loading branch information
syucream authored Aug 17, 2020
2 parents 5d0bb2b + d2d8591 commit be6cb1e
Show file tree
Hide file tree
Showing 24 changed files with 384 additions and 2,464 deletions.
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# v0.1.x

## Release v0.1.0 - 2020/08/xx

### Breaking changes

- `Columnifier` interface stops supporting `io.WriterCloser`

### Enhancement

- #52 Implement stream-based input record decoder's to reduce memory consumption.


# v0.0.x

## Release v0.0.4 - 2020/07/29

### Bug fix

- Fix #49 Capture the first error caused by columnifier

... And some small improvements.

## Release v0.0.3 - 2020/06/02

Ready to publish columnify as an OSS.

## Release v0.0.2 - 2020/06/02

Ready for production.

## Release v0.0.1 - 2020/04/19

Initial release.
4 changes: 2 additions & 2 deletions columnifier/columnifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import "io"

// Columnifier is the interface that converts input file to columnar format file.
type Columnifier interface {
io.WriteCloser

WriteFromReader(reader io.Reader) (int, error)
WriteFromFiles(paths []string) (int, error)
Close() error
}

// NewColumnifier creates a new Columnifier.
Expand Down
46 changes: 24 additions & 22 deletions columnifier/parquet.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package columnifier

import (
"io"
"io/ioutil"
"os"

"github.com/xitongsys/parquet-go/marshal"

"github.com/reproio/columnify/record"

Expand Down Expand Up @@ -57,6 +61,9 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi
w.RowGroupSize = config.Parquet.RowGroupSize
w.CompressionType = config.Parquet.CompressionCodec

// Intermediate record type is string typed JSON values
w.MarshalFunc = marshal.MarshalJSON

return &parquetColumnifier{
w: w,
schema: intermediateSchema,
Expand All @@ -65,36 +72,30 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi
}

// Write reads, converts input binary data and write it to buffer.
func (c *parquetColumnifier) Write(data []byte) (int, error) {
// Intermediate record type is map[string]interface{}
c.w.MarshalFunc = parquet.MarshalMap
records, err := record.FormatToMap(data, c.schema, c.rt)
func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) {
decoder, err := record.NewJsonStringConverter(reader, c.schema, c.rt)
if err != nil {
return -1, err
}

beforeSize := c.w.Size
for _, r := range records {
if err := c.w.Write(r); err != nil {
for {
var v string
err = decoder.Convert(&v)
if err != nil {
if err == io.EOF {
break
} else {
return -1, err
}
}

if err := c.w.Write(v); err != nil {
return -1, err
}
}
afterSize := c.w.Size

// Intermediate record type is wrapped Apache Arrow record
// It requires Arrow Golang implementation more logical type supports
// ref. https://github.com/apache/arrow/blob/9c9dc2012266442d0848e4af0cf52874bc4db151/go/arrow/array/builder.go#L211
/*
c.w.MarshalFunc = parquet.MarshalArrow
records, err := record.FormatToArrow(data, c.schema, c.rt)
if err != nil {
return err
}
if err := c.w.Write(&records); err != nil {
return err
}
*/

return int(afterSize - beforeSize), nil
}

Expand All @@ -103,11 +104,12 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) {
var n int

for _, p := range paths {
data, err := ioutil.ReadFile(p)
f, err := os.Open(p)
if err != nil {
return -1, err
}
if n, err = c.Write(data); err != nil {

if n, err = c.WriteFromReader(f); err != nil {
return -1, err
}
}
Expand Down
12 changes: 0 additions & 12 deletions parquet/doc.go

This file was deleted.

Loading

0 comments on commit be6cb1e

Please sign in to comment.