Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call IPCReader.Err() after reader loop #215

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions pkg/otel/arrow_record/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package arrow_record
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"math/rand"
Expand Down Expand Up @@ -60,9 +61,12 @@ type ConsumerAPI interface {

var _ ConsumerAPI = &Consumer{}

var ErrConsumerMemoryLimit = fmt.Errorf(
"The number of decoded records is smaller than the number of received payloads. " +
"Please increase the memory limit of the consumer.")
// ErrConsumerMemoryLimit is used by calling code to check
// errors.Is(err, ErrConsumerMemoryLimit). It is never returned.
var ErrConsumerMemoryLimit error = common.LimitError{}

var errConsumerInternalError = errors.New(
"internal error: number of decoded records is smaller than the number of received payloads")

// Consumer is a BatchArrowRecords consumer.
type Consumer struct {
Expand Down Expand Up @@ -311,12 +315,15 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces,

// Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage.
// Note: the records wrapped in the RecordMessage must be released after use by the caller.
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) {
func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) (ibes []*record_message.RecordMessage, retErr error) {
ctx := context.Background()

var ibes []*record_message.RecordMessage
defer func() {
c.recordsCounter.Add(ctx, int64(len(ibes)), c.metricOpts()...)
if retErr != nil {
releaseRecords(ibes)
ibes = nil
}
}()

// Transform each individual OtlpArrowPayload into RecordMessage
Expand Down Expand Up @@ -356,8 +363,7 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
ipc.WithZstd(),
)
if err != nil {
releaseRecords(ibes)
return nil, werror.Wrap(err)
return ibes, werror.Wrap(distinguishMemoryError(err))
}
sc.ipcReader = ipcReader
}
Expand All @@ -370,16 +376,27 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R
rec.Retain()
ibes = append(ibes, record_message.NewRecordMessage(bar.BatchId, payload.GetType(), rec))
}

if err := sc.ipcReader.Err(); err != nil {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return ibes, werror.Wrap(distinguishMemoryError(err))
}
}

if len(ibes) < len(bar.ArrowPayloads) {
releaseRecords(ibes)
return nil, ErrConsumerMemoryLimit
return ibes, werror.Wrap(errConsumerInternalError)
}

return ibes, nil
}

func distinguishMemoryError(err error) error {
limErr, ok := common.NewLimitErrorFromError(err)
if ok {
return limErr
}
return err
}

type runtimeChecker struct{}

var _ memory.TestingT = &runtimeChecker{}
Expand Down
37 changes: 29 additions & 8 deletions pkg/otel/common/arrow/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package arrow

import (
"fmt"
"os"
"regexp"
"strconv"

"github.com/apache/arrow/go/v14/arrow/memory"
)
Expand Down Expand Up @@ -44,8 +45,34 @@ type LimitError struct {

var _ error = LimitError{}

var limitRegexp = regexp.MustCompile(`memory limit exceeded: requested (\d+) out of (\d+) \(in-use=(\d+)\)`)

// NewLimitErrorFromError extracts a formatted limit error.
//
// Note: the arrow/go package (as of v16) has a panic recovery
// mechanism which formats the error object raised through panic in
// the code below. The formatting uses a "%v" which means we lose the
// error wrapping facility that would let us easily extract the
// object. Therefore, we use a regexp to unpack memory limit errors.
jmacd marked this conversation as resolved.
Show resolved Hide resolved
func NewLimitErrorFromError(err error) (error, bool) {
matches := limitRegexp.FindStringSubmatch(err.Error())
if len(matches) != 4 {
return err, false
}

req, _ := strconv.ParseUint(matches[1], 10, 64)
lim, _ := strconv.ParseUint(matches[2], 10, 64)
inuse, _ := strconv.ParseUint(matches[3], 10, 64)

return LimitError{
Request: req,
Inuse: inuse,
Limit: lim,
}, true
}

func (le LimitError) Error() string {
return fmt.Sprintf("allocation size %d exceeds limit %d (in-use=%d)", le.Request, le.Limit, le.Inuse)
return fmt.Sprintf("memory limit exceeded: requested %d out of %d (in-use=%d)", le.Request, le.Limit, le.Inuse)
}

func (_ LimitError) Is(tgt error) bool {
Expand All @@ -65,9 +92,6 @@ func (l *LimitedAllocator) Allocate(size int) []byte {
Inuse: l.inuse,
Limit: l.limit,
}
// Write the error to stderr so that it is visible even if the
// panic is caught.
os.Stderr.WriteString(err.Error() + "\n")
panic(err)
}

Expand All @@ -86,9 +110,6 @@ func (l *LimitedAllocator) Reallocate(size int, b []byte) []byte {
Inuse: l.inuse,
Limit: l.limit,
}
// Write the error to stderr so that it is visible even if the
// panic is caught.
os.Stderr.WriteString(err.Error() + "\n")
panic(err)
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/otel/common/arrow/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package arrow

import (
"errors"
"fmt"
"testing"

"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/stretchr/testify/require"
)

func TestLimitedAllocator(t *testing.T) {
func TestLimitedAllocatorUnformatted(t *testing.T) {
const boundary = 1000000
check := memory.NewCheckedAllocator(memory.NewGoAllocator())
limit := NewLimitedAllocator(check, boundary)
Expand All @@ -47,9 +48,26 @@ func TestLimitedAllocator(t *testing.T) {
}()
require.NotNil(t, capture)
require.True(t, errors.Is(capture.(error), LimitError{}))
require.Equal(t, "allocation size 1 exceeds limit 1000000 (in-use=1000000)", capture.(error).Error())
require.Equal(t, "memory limit exceeded: requested 1 out of 1000000 (in-use=1000000)", capture.(error).Error())

limit.Free(b)

check.AssertSize(t, 0)
}

func TestLimitedAllocatorFormatted(t *testing.T) {
// Arrow does not wrap the error, so the consumer sees its
// formatted version.
expect := LimitError{
Request: 1000,
Inuse: 9900,
Limit: 10000,
}

unwrap, ok := NewLimitErrorFromError(fmt.Errorf("some sort of prefix %v some sort of suffix", expect))
require.Error(t, unwrap)
require.True(t, ok)
require.Equal(t, expect, unwrap)

require.True(t, errors.Is(unwrap, LimitError{}))
}
Loading