From 21add9a952a9ae38d7105b505351bb933b7c920d Mon Sep 17 00:00:00 2001 From: Moh Osman <59479562+moh-osman3@users.noreply.github.com> Date: Wed, 12 Jun 2024 09:46:32 -0400 Subject: [PATCH] Fix missing maxStreamLifetime timeout (#222) Fix missing maxStreamLifetime by checking `stream.workState.maxStreamLifetime` instead of `stream.maxStreamLifetime`, because `stream.maxStreamLifetime` is never set anywhere. This PR also fixes missing `otel_arrow_stream_recv` parent spans in recently observed traces. --- .../otelarrowexporter/internal/arrow/stream.go | 10 ++-------- .../otelarrowexporter/internal/arrow/stream_test.go | 2 -- .../receiver/otelarrowreceiver/internal/arrow/arrow.go | 2 +- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index ae991241..6142e5c0 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -32,12 +32,6 @@ import ( // Stream is 1:1 with gRPC stream. type Stream struct { - // maxStreamLifetime is the max timeout before stream - // should be closed on the client side. This ensures a - // graceful shutdown before max_connection_age is reached - // on the server side. - maxStreamLifetime time.Duration - // producer is exclusive to the holder of the stream. producer arrowRecord.ProducerAPI @@ -257,8 +251,8 @@ func (s *Stream) write(ctx context.Context) (retErr error) { hdrsEnc := hpack.NewEncoder(&hdrsBuf) var timerCh <-chan time.Time - if s.maxStreamLifetime != 0 { - timer := time.NewTimer(s.maxStreamLifetime) + if s.workState.maxStreamLifetime != 0 { + timer := time.NewTimer(s.workState.maxStreamLifetime) timerCh = timer.C defer timer.Stop() } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go index f653cc88..416f8086 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream_test.go @@ -54,7 +54,6 @@ func newStreamTestCase(t *testing.T, pname PrioritizerName) *streamTestCase { ctc.requestMetadataCall.AnyTimes().Return(nil, nil) stream := newStream(producer, prio, ctc.telset, netstats.Noop{}, state[0]) - stream.maxStreamLifetime = 10 * time.Second fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0) fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0) @@ -144,7 +143,6 @@ func TestStreamNoMaxLifetime(t *testing.T) { t.Run(string(pname), func(t *testing.T) { tc := newStreamTestCase(t, pname) - tc.stream.maxStreamLifetime = 0 tc.fromTracesCall.Times(1).Return(oneBatch, nil) tc.closeSendCall.Times(0) diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 5515b7fe..02a8f7de 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -555,7 +555,7 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre defer flight.recvDone(inflightCtx, &retErr) // this span is a child of the inflight, covering the Arrow decode, Auth, etc. - _, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv") + inflightCtx, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv") defer span.End() if err != nil {