From e5b98f1908cc6340d658e8efca538ac4b1ccb2b8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 4 Jun 2024 15:33:52 -0700 Subject: [PATCH 1/3] Avoid work for already-canceled requests --- .../otelarrowexporter/internal/arrow/exporter.go | 10 ++++++++++ .../otelarrowexporter/internal/arrow/exporter_test.go | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index 3976f998..bdd8bb8c 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -20,7 +20,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" ) // Exporter is 1:1 with exporter, isolates arrow-specific @@ -255,6 +257,14 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { + // If the incoming context is already cancaled, return the + // same error condition a unary gRPC or HTTP exporter would do. + select { + case <-ctx.Done(): + return false, status.Errorf(codes.Canceled, "context done before send: %v", ctx.Err()) + default: + } + errCh := make(chan error, 1) // Note that if the OTLP exporter's gRPC Headers field was diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 276e5f3f..5a37c1d5 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -31,7 +31,9 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer} @@ -280,6 +282,13 @@ func TestArrowExporterTimeout(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, context.Canceled)) + // Repeat the request, will get immediate timeout. + sent, err = tc.exporter.SendAndWait(ctx, twoTraces) + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, "context done before send: context canceled", stat.Message()) + require.Equal(t, codes.Canceled, stat.Code()) + require.NoError(t, tc.exporter.Shutdown(ctx)) }) } From 2c6c67a69432ac087e240e7e7878eb24e52d0eba Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 5 Jun 2024 07:38:50 -0700 Subject: [PATCH 2/3] Update collector/exporter/otelarrowexporter/internal/arrow/exporter.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Laurent Quérel --- collector/exporter/otelarrowexporter/internal/arrow/exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go index bdd8bb8c..c406a6d1 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -257,7 +257,7 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { - // If the incoming context is already cancaled, return the + // If the incoming context is already canceled, return the // same error condition a unary gRPC or HTTP exporter would do. select { case <-ctx.Done(): From 673e69a18251c6dd77807f311cc17ad39ba581a5 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 5 Jun 2024 09:10:22 -0700 Subject: [PATCH 3/3] merge fix --- .../exporter/otelarrowexporter/internal/arrow/exporter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 92883cfd..8769d1e8 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -286,7 +286,7 @@ func TestArrowExporterTimeout(t *testing.T) { // Repeat the request, will get immediate timeout. sent, err = tc.exporter.SendAndWait(ctx, twoTraces) - stat, is := status.FromError(err) + stat, is = status.FromError(err) require.True(t, is, "is a gRPC status error: %v", err) require.Equal(t, "context done before send: context canceled", stat.Message()) require.Equal(t, codes.Canceled, stat.Code())