Skip to content

Commit

Permalink
Use gRPC Status codes in the Arrow exporter (#211)
Browse files Browse the repository at this point in the history
Part of #210.

Mainly, changes the use of `fmt.Errorf()` and bare
`context.Context.Err()` values, uses gRPC-Go's `status.Errorf()` to wrap
the error with a code that gRPC and its consumers recognize. The code
was out-of-line with the design of the top-level directory. Whereas the
OTel-Arrow exporter had been inserting consumererror.NewPermanent()
wrappers, it is the Exporter module which supports standard OTLP and
Arrow one layer up that is responsible for permanent error labeling.
Returning gRPC status errors is always preferred to fmt.Errorf in gRPC
components.


Secondly, re-order and rename of the fields passed to the "arrow stream
error" log statement, so that it matches the Reciever. This is used as
the basis of a test for logging consistency and was otherwise an
unintentional disagreement ("which" and "where").
  • Loading branch information
jmacd authored Jun 5, 2024
1 parent e3533c2 commit 156162c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime"
"sort"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write.
Expand Down Expand Up @@ -114,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro
case <-lp.done:
return ErrStreamRestarting
case <-ctx.Done():
return context.Canceled
return status.Errorf(codes.Canceled, "stream wait: %v", ctx.Err())
case lp.input <- wri:
return waitForWrite(ctx, errCh, lp.done)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

// Exporter is 1:1 with exporter, isolates arrow-specific
Expand Down Expand Up @@ -340,7 +342,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{})
select {
case <-ctx.Done():
// This caller's context timed out.
return ctx.Err()
return status.Errorf(codes.Canceled, "send wait: %v", ctx.Err())
case <-down:
return ErrStreamRestarting
case err := <-errCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package arrow
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -31,7 +30,9 @@ import (
"go.uber.org/zap/zaptest"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer}
Expand Down Expand Up @@ -278,7 +279,10 @@ func TestArrowExporterTimeout(t *testing.T) {
sent, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.True(t, sent)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status")
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(ctx))
})
Expand Down Expand Up @@ -406,7 +410,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) {
}()
_, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(bg))
})
Expand Down Expand Up @@ -489,7 +496,10 @@ func TestArrowExporterStreamRace(t *testing.T) {
// This blocks until the cancelation.
_, err := tc.exporter.SendAndWait(callctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())
}()
}

Expand Down
36 changes: 17 additions & 19 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -135,9 +133,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) {
s.workState.waiters[batchID] = errCh
}

// logStreamError decides how to log an error. `which` indicates the
// stream direction, will be "reader" or "writer".
func (s *Stream) logStreamError(which string, err error) {
// logStreamError decides how to log an error. `where` indicates the
// error location, will be "reader" or "writer".
func (s *Stream) logStreamError(where string, err error) {
var code codes.Code
var msg string
// gRPC tends to supply status-wrapped errors, so we always
Expand All @@ -156,9 +154,9 @@ func (s *Stream) logStreamError(which string, err error) {
msg = err.Error()
}
if code == codes.Canceled {
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg))
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where))
} else {
s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code)))
s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where))
}
}

Expand Down Expand Up @@ -274,7 +272,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) {
return nil
case wri = <-s.workState.toWrite:
case <-ctx.Done():
return ctx.Err()
return status.Errorf(codes.Canceled, "stream input: %v", ctx.Err())
}

err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc)
Expand Down Expand Up @@ -319,8 +317,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "encode: %v", err)
wri.errCh <- err
return err
}

Expand All @@ -336,8 +334,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "hpack: %v", err)
wri.errCh <- err
return err
}
}
Expand Down Expand Up @@ -382,24 +380,24 @@ func (s *Stream) read(_ context.Context) error {
}

if err = s.processBatchStatus(resp); err != nil {
return fmt.Errorf("process: %w", err)
return err
}
}
}

// getSenderChannel takes the stream lock and removes the corresonding
// sender channel.
func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) {
func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) {
sws.lock.Lock()
defer sws.lock.Unlock()

ch, ok := sws.waiters[status.BatchId]
ch, ok := sws.waiters[bstat.BatchId]
if !ok {
// Will break the stream.
return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId)
return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId)
}

delete(sws.waiters, status.BatchId)
delete(sws.waiters, bstat.BatchId)
return ch, nil
}

Expand Down Expand Up @@ -460,7 +458,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
zap.Reflect("recovered", err),
zap.Stack("stacktrace"),
)
retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err)
retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err)
}
}()
var batch *arrowpb.BatchArrowRecords
Expand All @@ -473,7 +471,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
case pmetric.Metrics:
batch, err = s.producer.BatchArrowRecordsFromMetrics(data)
default:
return nil, fmt.Errorf("unsupported OTLP type: %T", records)
return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records)
}
return batch, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var oneBatch = &arrowpb.BatchArrowRecords{
Expand Down Expand Up @@ -182,8 +183,12 @@ func TestStreamEncodeError(t *testing.T) {
// sender should get a permanent testErr
err := tc.mustSendAndWait()
require.Error(t, err)
require.True(t, errors.Is(err, testErr))
require.True(t, consumererror.IsPermanent(err))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Internal, stat.Code())

require.Contains(t, stat.Message(), testErr.Error())
})
}
}
Expand Down

0 comments on commit 156162c

Please sign in to comment.