diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 0896ec4f97..308af9bc80 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -1231,3 +1231,19 @@ func TestMultipleCloseClient(t *testing.T) { client.Close() client.Close() } + +func TestConnectionClosedError(t *testing.T) { + c, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:1234", // fake address + ConnectionTimeout: 1 * time.Second, + OperationTimeout: 1 * time.Second, + }) + assert.NoError(t, err) + + _, err = c.CreateProducer(ProducerOptions{ + Topic: "my-topic", + }) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "failed to connect to broker: dial tcp [::1]:1234: connect:"+ + " connection refused"), "error-message", err.Error()) +} diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index cd236ecc22..03d1c7d21a 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -358,7 +358,7 @@ func (dummyConnection) GetMaxMessageSize() int32 { func (dummyConnection) Close() { } -func (dummyConnection) WaitForClose() <-chan struct{} { +func (dummyConnection) WaitForClose() <-chan error { return nil } @@ -366,6 +366,10 @@ func (dummyConnection) IsProxied() bool { return false } +func (dummyConnection) CloseWithErr(_ error) { + +} + func TestMultiTopicAckIDListTimeout(t *testing.T) { topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano()) assert.NoError(t, createPartitionedTopic(topic, 5)) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 3445f42543..8e95342635 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -121,7 +121,8 @@ func TestProducerConsumer(t *testing.T) { func TestConsumerConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ - URL: "pulsar://invalid-hostname:6650", + URL: "pulsar://invalid-hostname:6650", + OperationTimeout: 5 * time.Second, }) assert.Nil(t, err) @@ -137,7 +138,7 @@ func TestConsumerConnectError(t *testing.T) { assert.Nil(t, consumer) assert.NotNil(t, err) - assert.ErrorContains(t, err, "connection error") + assert.ErrorContains(t, err, "failed to connect to broker") } func TestBatchMessageReceive(t *testing.T) { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 84c4323d9c..98413f56ef 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -86,7 +86,8 @@ type Connection interface { ID() string GetMaxMessageSize() int32 Close() - WaitForClose() <-chan struct{} + CloseWithErr(err error) + WaitForClose() <-chan error IsProxied() bool } @@ -161,7 +162,7 @@ type connection struct { incomingRequestsWG sync.WaitGroup incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd - closeCh chan struct{} + closeCh chan error readyCh chan struct{} writeRequestsCh chan Buffer @@ -206,7 +207,7 @@ func newConnection(opts connectionOptions) *connection { tlsOptions: opts.tls, auth: opts.auth, - closeCh: make(chan struct{}), + closeCh: make(chan error), readyCh: make(chan struct{}), incomingRequestsCh: make(chan *request, 10), incomingCmdCh: make(chan *incomingCmd, 10), @@ -282,7 +283,7 @@ func (c *connection) connect() bool { if err != nil { c.log.WithError(err).Warn("Failed to connect to broker.") - c.Close() + c.CloseWithErr(fmt.Errorf("failed to connect to broker: %w", err)) return false } @@ -367,9 +368,9 @@ func (c *connection) waitUntilReady() error { select { case <-c.readyCh: return nil - case <-c.closeCh: + case err := <-c.closeCh: // Connection has been closed while waiting for the readiness. - return errors.New("connection error") + return err } } @@ -1026,7 +1027,7 @@ func (c *connection) CheckIdle(maxIdleTime time.Duration) bool { return time.Since(c.lastActive) > maxIdleTime } -func (c *connection) WaitForClose() <-chan struct{} { +func (c *connection) WaitForClose() <-chan error { return c.closeCh } @@ -1034,6 +1035,10 @@ func (c *connection) WaitForClose() <-chan struct{} { // closing underlying socket connection and closeCh. // This also triggers callbacks to the ConnectionClosed listeners. func (c *connection) Close() { + c.CloseWithErr(nil) +} + +func (c *connection) CloseWithErr(err error) { c.closeOnce.Do(func() { listeners, consumerHandlers, cnx := c.closeAndEmptyObservers() @@ -1041,7 +1046,10 @@ func (c *connection) Close() { _ = cnx.Close() } - close(c.closeCh) + go func() { + c.closeCh <- err + close(c.closeCh) + }() // notify producers connection closed for _, listener := range listeners { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 24939a82b4..f1bdba8e7e 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -58,7 +58,8 @@ func TestInvalidURL(t *testing.T) { func TestProducerConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ - URL: "pulsar://invalid-hostname:6650", + URL: "pulsar://invalid-hostname:6650", + OperationTimeout: 5 * time.Second, }) assert.Nil(t, err) @@ -73,7 +74,7 @@ func TestProducerConnectError(t *testing.T) { assert.Nil(t, producer) assert.NotNil(t, err) - assert.ErrorContains(t, err, "connection error") + assert.ErrorContains(t, err, "failed to connect to broker") } func TestProducerNoTopic(t *testing.T) { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 836535704b..63bdbc9b14 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -205,7 +205,8 @@ func TestReaderOnPartitionedTopic(t *testing.T) { func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ - URL: "pulsar://invalid-hostname:6650", + URL: "pulsar://invalid-hostname:6650", + OperationTimeout: 5 * time.Second, }) assert.Nil(t, err) @@ -221,7 +222,7 @@ func TestReaderConnectError(t *testing.T) { assert.Nil(t, reader) assert.NotNil(t, err) - assert.ErrorContains(t, err, "connection error") + assert.ErrorContains(t, err, "failed to connect to broker") } func TestReaderOnSpecificMessage(t *testing.T) {