Skip to content

Commit

Permalink
Merge pull request #24 from billowqiu/fix_use_closed_consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
jayxhj authored Oct 18, 2022
2 parents fc0a56d + 436c3d0 commit 4410940
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
12 changes: 9 additions & 3 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ func (c *consumer) AckID(msgID MessageID) error {
}

if mid.consumer != nil {
return mid.Ack()
if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady {
return mid.Ack()
}
}

return c.consumers[mid.partitionIdx].AckID(mid)
Expand Down Expand Up @@ -526,7 +528,9 @@ func (c *consumer) Nack(msg Message) error {
}

if mid.consumer != nil {
return mid.Nack()
if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady {
return mid.Nack()
}
}
return c.consumers[mid.partitionIdx].NackMsg(msg)
}
Expand All @@ -541,7 +545,9 @@ func (c *consumer) NackID(msgID MessageID) error {
}

if mid.consumer != nil {
return mid.Nack()
if pc, ok := (mid.consumer).(*partitionConsumer); ok && pc.getConsumerState() == consumerReady {
return mid.Nack()
}
}

return c.consumers[mid.partitionIdx].NackID(mid)
Expand Down
14 changes: 7 additions & 7 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error
func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
if !msgID.Undefined() && msgID.ack() {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack message on closing or closed consumer")
pc.log.WithField("cnx", pc._getConn().ID()).WithField("state", state).Error("Failed to ack message on closing or closed consumer")
return newError(ConsumerClosed, "consumer closed")
}

Expand All @@ -329,7 +329,7 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {

func (pc *partitionConsumer) NackID(msgID trackingMessageID) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to nack message on closing or closed consumer")
pc.log.WithField("cnx", pc._getConn().ID()).WithField("state", state).Error("Failed to nack message on closing or closed consumer")
return newError(ConsumerClosed, "consumer closed")
}
pc.nackTracker.Add(msgID.messageID)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
MessageIds: msgIDDataList,
})
if err != nil {
pc.log.Errorf("request redeliver message: %v, error: %v", msgIds, err)
pc.log.WithField("cnx", pc._getConn().ID()).Errorf("request redeliver message: %v, error: %v", msgIds, err)
}
}

Expand Down Expand Up @@ -509,7 +509,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {

err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Errorf("request internal ack message: %v, consumer: %d, error: %v", msgID.String(), pc.consumerID, err)
pc.log.WithField("cnx", pc._getConn().ID()).Errorf("request internal ack message: %v, consumer: %d, error: %v", msgID.String(), pc.consumerID, err)
}
}

Expand Down Expand Up @@ -738,7 +738,7 @@ func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {

func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.log.Warn("connection closed and send to connectClosedCh")
pc.log.WithField("cnx", pc._getConn().ID()).Warn("connection closed and send to connectClosedCh")
pc.connectClosedCh <- connectionClosed{}
}

Expand Down Expand Up @@ -851,7 +851,7 @@ func (pc *partitionConsumer) dispatcher() {
}
if time.Since(lastLogFlowTimestamp) > time.Minute {
lastLogFlowTimestamp = time.Now()
pc.log.Infof("interval log requesting more permits=%d available=%d", requestedPermits, availablePermits)
pc.log.WithField("cnx", pc._getConn().ID()).Infof("interval log requesting more permits=%d available=%d", requestedPermits, availablePermits)
}
}

Expand Down Expand Up @@ -993,7 +993,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}

pc.setConsumerState(consumerClosing)
pc.log.Infof("Closing consumer=%d", pc.consumerID)
pc.log.WithField("cnx", pc._getConn().ID()).Infof("Closing consumer=%d", pc.consumerID)

requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
Expand Down

0 comments on commit 4410940

Please sign in to comment.