diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index 6c7728cf0a..465ee3db2d 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -29,13 +29,20 @@ const toPrefix = "To__" type ProducerInterceptor struct { } -func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) { +func (t *ProducerInterceptor) BeforeSend( + ctx context.Context, + producer pulsar.Producer, + message *pulsar.ProducerMessage, +) context.Context { buildAndInjectSpan(message, producer).Finish() + return ctx } -func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, - message *pulsar.ProducerMessage, - msgID pulsar.MessageID) { +func (t *ProducerInterceptor) OnSendAcknowledgement( + _ context.Context, + _ pulsar.Producer, + _ *pulsar.ProducerMessage, + _ pulsar.MessageID) { } func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index e18994cfcd..7358349ef5 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,27 +17,30 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the // message. - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context { for i := range x { - x[i].BeforeSend(producer, message) + ctx = x[i].BeforeSend(ctx, producer, message) } + return ctx } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgID) + x[i].OnSendAcknowledgement(ctx, producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 46167d0cf1..c99a669360 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -994,7 +994,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes isDone := uAtomic.NewBool(false) doneCh := make(chan struct{}) - p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { + ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { if isDone.CAS(false, true) { err = e msgID = ID @@ -1194,11 +1194,11 @@ func (p *partitionProducer) internalSendAsync( msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool, -) { +) context.Context { if err := p.validateMsg(msg); err != nil { p.log.Error(err) runCallback(callback, nil, msg, err) - return + return ctx } sr := sendRequestPool.Get().(*sendRequest) @@ -1216,26 +1216,27 @@ func (p *partitionProducer) internalSendAsync( if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) - return + return ctx } if p.getProducerState() != producerReady { sr.done(nil, errProducerClosed) - return + return ctx } - p.options.Interceptors.BeforeSend(p, msg) + ctx = p.options.Interceptors.BeforeSend(ctx, p, msg) + sr.ctx = ctx if err := p.updateSchema(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.updateMetaData(sr) @@ -1243,16 +1244,18 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateChunkInfo(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.reserveResources(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.dataChan <- sr + + return ctx } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1497,7 +1500,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { if sr.producer.options.Interceptors != nil { - sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID) } } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0f89069243..c115faa55e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1480,23 +1480,38 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context { + return ctx +} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { } -// copyPropertyIntercepotr copy all keys in message properties map and add a suffix -type metricProduceInterceptor struct { - sendn int - ackn int +type trackingProduceInterceptor struct { + sendn int + ackn int + maxDuration time.Duration } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { - x.sendn++ +type beforeSendCtxKey struct{} + +func (i *trackingProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, msg *ProducerMessage) context.Context { + i.sendn++ + ctx = context.WithValue(ctx, beforeSendCtxKey{}, time.Now()) + return ctx } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { - x.ackn++ +func (i *trackingProduceInterceptor) OnSendAcknowledgement(ctx context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { + var dur time.Duration + if v := ctx.Value(beforeSendCtxKey{}); v != nil { + dur = time.Since(v.(time.Time)) + } + + if dur > i.maxDuration { + i.maxDuration = dur + } + + i.ackn++ } func TestProducerWithInterceptors(t *testing.T) { @@ -1519,14 +1534,14 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - metric := &metricProduceInterceptor{} + interceptor := &trackingProduceInterceptor{} // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, DisableBatching: false, Interceptors: ProducerInterceptors{ noopProduceInterceptor{}, - metric, + interceptor, }, }) assert.Nil(t, err) @@ -1576,8 +1591,9 @@ func TestProducerWithInterceptors(t *testing.T) { consumer.Ack(msg) } - assert.Equal(t, 10, metric.sendn) - assert.Equal(t, 10, metric.ackn) + assert.Equal(t, 10, interceptor.sendn) + assert.Equal(t, 10, interceptor.ackn) + assert.NotZero(t, interceptor.maxDuration) } func TestProducerSendAfterClose(t *testing.T) { @@ -1720,7 +1736,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2", @@ -1811,7 +1827,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2",