Skip to content

Commit

Permalink
Fix reconnecting behavior when RabbitMQ is restarted
Browse files Browse the repository at this point in the history
  • Loading branch information
bitzl authored and clorenz committed May 7, 2019
1 parent 861b7d9 commit 9712301
Showing 1 changed file with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RabbitClient {

private static final boolean SINGLE_MESSAGE = false;

private final Channel channel;
private Channel channel;

private final ObjectMapper objectMapper;

Expand Down Expand Up @@ -64,8 +64,8 @@ void sendRaw(String exchange, String routingKey, byte[] data) throws IOException

try {
channel.basicPublish(exchange, routingKey, properties, data);
} catch (IOException e) {
waitForConnection("Could not publish message to " + routingKey);
} catch (Exception e) {
tryToReconnect("Could not publish message to " + routingKey);
channel.basicPublish(exchange, routingKey, properties, data);
}
}
Expand All @@ -81,15 +81,16 @@ byte[] serialize(Message message) throws IOException {
public void ack(Message message) throws IOException {
try {
channel.basicAck(message.getEnvelope().getDeliveryTag(), SINGLE_MESSAGE);
} catch (IOException e) {
waitForConnection("Could not ack message");
} catch (Exception e) {
tryToReconnect("Could not ack message");
channel.basicAck(message.getEnvelope().getDeliveryTag(), SINGLE_MESSAGE);
}
}

private void waitForConnection(String errorMessage) throws IOException {
private void tryToReconnect(String errorMessage) throws IOException {
try {
connection.waitForConnection();
channel = connection.getChannel();
} catch (IOException e) {
throw new IOException(errorMessage, e);
}
Expand All @@ -99,8 +100,8 @@ public Message receive(String queueName) throws IOException, InvalidMessageExcep
GetResponse response;
try {
response = channel.basicGet(queueName, NO_AUTO_ACK);
} catch (IOException e) {
waitForConnection("Could not receive message from " + queueName);
} catch (Exception e) {
tryToReconnect("Could not receive message from " + queueName);
response = channel.basicGet(queueName, NO_AUTO_ACK);
}
if (response != null) {
Expand All @@ -127,8 +128,8 @@ public void provideExchange(String exchange) throws IOException {
GetResponse response;
try {
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE);
} catch (IOException e) {
waitForConnection("Could not declare exchange");
} catch (Exception e) {
tryToReconnect("Could not declare exchange");
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE);
}
}
Expand All @@ -141,17 +142,17 @@ public void declareQueue(String name, String exchange, String routingKey, Map<St
public void createQueue(String name, Map<String, Object> args) throws IOException {
try {
channel.queueDeclare(name, DURABLE, NOT_EXCLUSIVE, NO_AUTO_DELETE, args);
} catch (IOException e) {
waitForConnection("Could not declare queue");
} catch (Exception e) {
tryToReconnect("Could not declare queue");
channel.queueDeclare(name, DURABLE, NOT_EXCLUSIVE, NO_AUTO_DELETE, args);
}
}

public void bindQueue(String name, String exchange, String routingKey) throws IOException {
try {
channel.queueBind(name, exchange, routingKey);
} catch (IOException e) {
waitForConnection("Could not bind queue to exchange");
} catch (Exception e) {
tryToReconnect("Could not bind queue to exchange");
channel.queueBind(name, exchange, routingKey);
}
}
Expand Down

0 comments on commit 9712301

Please sign in to comment.