diff --git a/pkg/otel/arrow_record/consumer.go b/pkg/otel/arrow_record/consumer.go index 8bb67172..62eff883 100644 --- a/pkg/otel/arrow_record/consumer.go +++ b/pkg/otel/arrow_record/consumer.go @@ -17,6 +17,7 @@ package arrow_record import ( "bytes" "context" + "errors" "fmt" "log" "math/rand" @@ -60,9 +61,12 @@ type ConsumerAPI interface { var _ ConsumerAPI = &Consumer{} -var ErrConsumerMemoryLimit = fmt.Errorf( - "The number of decoded records is smaller than the number of received payloads. " + - "Please increase the memory limit of the consumer.") +// ErrConsumerMemoryLimit is used by calling code to check +// errors.Is(err, ErrConsumerMemoryLimit). It is never returned. +var ErrConsumerMemoryLimit error = common.LimitError{} + +var errConsumerInternalError = errors.New( + "internal error: number of decoded records is smaller than the number of received payloads") // Consumer is a BatchArrowRecords consumer. type Consumer struct { @@ -311,12 +315,15 @@ func (c *Consumer) TracesFrom(bar *colarspb.BatchArrowRecords) ([]ptrace.Traces, // Consume takes a BatchArrowRecords protobuf message and returns an array of RecordMessage. // Note: the records wrapped in the RecordMessage must be released after use by the caller. -func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.RecordMessage, error) { +func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) (ibes []*record_message.RecordMessage, retErr error) { ctx := context.Background() - var ibes []*record_message.RecordMessage defer func() { c.recordsCounter.Add(ctx, int64(len(ibes)), c.metricOpts()...) + if retErr != nil { + releaseRecords(ibes) + ibes = nil + } }() // Transform each individual OtlpArrowPayload into RecordMessage @@ -356,8 +363,7 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R ipc.WithZstd(), ) if err != nil { - releaseRecords(ibes) - return nil, werror.Wrap(err) + return ibes, werror.Wrap(distinguishMemoryError(err)) } sc.ipcReader = ipcReader } @@ -370,16 +376,27 @@ func (c *Consumer) Consume(bar *colarspb.BatchArrowRecords) ([]*record_message.R rec.Retain() ibes = append(ibes, record_message.NewRecordMessage(bar.BatchId, payload.GetType(), rec)) } + + if err := sc.ipcReader.Err(); err != nil { + return ibes, werror.Wrap(distinguishMemoryError(err)) + } } if len(ibes) < len(bar.ArrowPayloads) { - releaseRecords(ibes) - return nil, ErrConsumerMemoryLimit + return ibes, werror.Wrap(errConsumerInternalError) } return ibes, nil } +func distinguishMemoryError(err error) error { + limErr, ok := common.NewLimitErrorFromError(err) + if ok { + return limErr + } + return err +} + type runtimeChecker struct{} var _ memory.TestingT = &runtimeChecker{} diff --git a/pkg/otel/common/arrow/allocator.go b/pkg/otel/common/arrow/allocator.go index dac4e71b..6ac7d304 100644 --- a/pkg/otel/common/arrow/allocator.go +++ b/pkg/otel/common/arrow/allocator.go @@ -16,7 +16,8 @@ package arrow import ( "fmt" - "os" + "regexp" + "strconv" "github.com/apache/arrow/go/v14/arrow/memory" ) @@ -44,8 +45,34 @@ type LimitError struct { var _ error = LimitError{} +var limitRegexp = regexp.MustCompile(`memory limit exceeded: requested (\d+) out of (\d+) \(in-use=(\d+)\)`) + +// NewLimitErrorFromError extracts a formatted limit error. +// +// Note: the arrow/go package (as of v16) has a panic recovery +// mechanism which formats the error object raised through panic in +// the code below. The formatting uses a "%v" which means we lose the +// error wrapping facility that would let us easily extract the +// object. Therefore, we use a regexp to unpack memory limit errors. +func NewLimitErrorFromError(err error) (error, bool) { + matches := limitRegexp.FindStringSubmatch(err.Error()) + if len(matches) != 4 { + return err, false + } + + req, _ := strconv.ParseUint(matches[1], 10, 64) + lim, _ := strconv.ParseUint(matches[2], 10, 64) + inuse, _ := strconv.ParseUint(matches[3], 10, 64) + + return LimitError{ + Request: req, + Inuse: inuse, + Limit: lim, + }, true +} + func (le LimitError) Error() string { - return fmt.Sprintf("allocation size %d exceeds limit %d (in-use=%d)", le.Request, le.Limit, le.Inuse) + return fmt.Sprintf("memory limit exceeded: requested %d out of %d (in-use=%d)", le.Request, le.Limit, le.Inuse) } func (_ LimitError) Is(tgt error) bool { @@ -65,9 +92,6 @@ func (l *LimitedAllocator) Allocate(size int) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } @@ -86,9 +110,6 @@ func (l *LimitedAllocator) Reallocate(size int, b []byte) []byte { Inuse: l.inuse, Limit: l.limit, } - // Write the error to stderr so that it is visible even if the - // panic is caught. - os.Stderr.WriteString(err.Error() + "\n") panic(err) } diff --git a/pkg/otel/common/arrow/allocator_test.go b/pkg/otel/common/arrow/allocator_test.go index cc28564f..4362bf26 100644 --- a/pkg/otel/common/arrow/allocator_test.go +++ b/pkg/otel/common/arrow/allocator_test.go @@ -16,13 +16,14 @@ package arrow import ( "errors" + "fmt" "testing" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/stretchr/testify/require" ) -func TestLimitedAllocator(t *testing.T) { +func TestLimitedAllocatorUnformatted(t *testing.T) { const boundary = 1000000 check := memory.NewCheckedAllocator(memory.NewGoAllocator()) limit := NewLimitedAllocator(check, boundary) @@ -47,9 +48,26 @@ func TestLimitedAllocator(t *testing.T) { }() require.NotNil(t, capture) require.True(t, errors.Is(capture.(error), LimitError{})) - require.Equal(t, "allocation size 1 exceeds limit 1000000 (in-use=1000000)", capture.(error).Error()) + require.Equal(t, "memory limit exceeded: requested 1 out of 1000000 (in-use=1000000)", capture.(error).Error()) limit.Free(b) check.AssertSize(t, 0) } + +func TestLimitedAllocatorFormatted(t *testing.T) { + // Arrow does not wrap the error, so the consumer sees its + // formatted version. + expect := LimitError{ + Request: 1000, + Inuse: 9900, + Limit: 10000, + } + + unwrap, ok := NewLimitErrorFromError(fmt.Errorf("some sort of prefix %v some sort of suffix", expect)) + require.Error(t, unwrap) + require.True(t, ok) + require.Equal(t, expect, unwrap) + + require.True(t, errors.Is(unwrap, LimitError{})) +}