Skip to content

Commit

Permalink
uploader: propagate errors in compress
Browse files Browse the repository at this point in the history
  • Loading branch information
teqwve committed Feb 5, 2024
1 parent 67d4abb commit db71271
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions uploader/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,33 @@ func (u *Base) uploadWorker(ctx context.Context) {
}
}

func compress(data io.Reader) io.Reader {
func compress(data *io.PipeReader) io.Reader {
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)

go func() {
_, _ = io.Copy(gw, data)
gw.Close()
_, err := io.Copy(gw, data)
if err != nil {
pw.CloseWithError(err)
data.CloseWithError(err)
return
}

err = gw.Close()
if err != nil {
pw.CloseWithError(err)
data.CloseWithError(err)
return
}

pw.Close()
data.Close()
}()

return pr
}

func (u *Base) insertRowBinary(table string, data io.Reader) error {
func (u *Base) insertRowBinary(table string, data *io.PipeReader) error {
p, err := url.Parse(u.config.URL)
if err != nil {
return err
Expand Down

0 comments on commit db71271

Please sign in to comment.