From 436c3d09f6b478a171272bf0c3210805cd7bc374 Mon Sep 17 00:00:00 2001 From: billowqiu Date: Wed, 21 Sep 2022 22:08:24 +0800 Subject: [PATCH] 1: fix https://github.com/apache/pulsar-client-go/issues/850 2: add some log for pc --- pulsar/consumer_impl.go | 12 +++++++++--- pulsar/consumer_partition.go | 14 +++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 03c6f66647..ec79aedc29 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -463,7 +463,9 @@ func (c *consumer) AckID(msgID MessageID) error { } if mid.consumer != nil { - return mid.Ack() + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + return mid.Ack() + } } return c.consumers[mid.partitionIdx].AckID(mid) @@ -526,7 +528,9 @@ func (c *consumer) Nack(msg Message) error { } if mid.consumer != nil { - return mid.Nack() + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + return mid.Nack() + } } return c.consumers[mid.partitionIdx].NackMsg(msg) } @@ -541,7 +545,9 @@ func (c *consumer) NackID(msgID MessageID) error { } if mid.consumer != nil { - return mid.Nack() + if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady { + return mid.Nack() + } } return c.consumers[mid.partitionIdx].NackID(mid) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2ccf2fec6a..818c018b0b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -311,7 +311,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { if !msgID.Undefined() && msgID.ack() { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to ack message on closing or closed consumer") + pc.log.WithField("cnx", pc._getConn().ID()).WithField("state", state).Error("Failed to ack message on closing or closed consumer") return newError(ConsumerClosed, "consumer closed") } @@ -329,7 +329,7 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error { func (pc *partitionConsumer) NackID(msgID trackingMessageID) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to nack message on closing or closed consumer") + pc.log.WithField("cnx", pc._getConn().ID()).WithField("state", state).Error("Failed to nack message on closing or closed consumer") return newError(ConsumerClosed, "consumer closed") } pc.nackTracker.Add(msgID.messageID) @@ -371,7 +371,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { MessageIds: msgIDDataList, }) if err != nil { - pc.log.Errorf("request redeliver message: %v, error: %v", msgIds, err) + pc.log.WithField("cnx", pc._getConn().ID()).Errorf("request redeliver message: %v, error: %v", msgIds, err) } } @@ -509,7 +509,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) if err != nil { - pc.log.Errorf("request internal ack message: %v, consumer: %d, error: %v", msgID.String(), pc.consumerID, err) + pc.log.WithField("cnx", pc._getConn().ID()).Errorf("request internal ack message: %v, consumer: %d, error: %v", msgID.String(), pc.consumerID, err) } } @@ -738,7 +738,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext { func (pc *partitionConsumer) ConnectionClosed() { // Trigger reconnection in the consumer goroutine - pc.log.Warn("connection closed and send to connectClosedCh") + pc.log.WithField("cnx", pc._getConn().ID()).Warn("connection closed and send to connectClosedCh") pc.connectClosedCh <- connectionClosed{} } @@ -851,7 +851,7 @@ func (pc *partitionConsumer) dispatcher() { } if time.Since(lastLogFlowTimestamp) > time.Minute { lastLogFlowTimestamp = time.Now() - pc.log.Infof("interval log requesting more permits=%d available=%d", requestedPermits, availablePermits) + pc.log.WithField("cnx", pc._getConn().ID()).Infof("interval log requesting more permits=%d available=%d", requestedPermits, availablePermits) } } @@ -993,7 +993,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { } pc.setConsumerState(consumerClosing) - pc.log.Infof("Closing consumer=%d", pc.consumerID) + pc.log.WithField("cnx", pc._getConn().ID()).Infof("Closing consumer=%d", pc.consumerID) requestID := pc.client.rpcClient.NewRequestID() cmdClose := &pb.CommandCloseConsumer{