diff --git a/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/RabbitClient.java b/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/RabbitClient.java index 2f94b5fb..b6f7ad4d 100644 --- a/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/RabbitClient.java +++ b/engine/src/main/java/de/digitalcollections/flusswerk/engine/messagebroker/RabbitClient.java @@ -30,7 +30,7 @@ class RabbitClient { private static final boolean SINGLE_MESSAGE = false; - private final Channel channel; + private Channel channel; private final ObjectMapper objectMapper; @@ -64,8 +64,8 @@ void sendRaw(String exchange, String routingKey, byte[] data) throws IOException try { channel.basicPublish(exchange, routingKey, properties, data); - } catch (IOException e) { - waitForConnection("Could not publish message to " + routingKey); + } catch (Exception e) { + tryToReconnect("Could not publish message to " + routingKey); channel.basicPublish(exchange, routingKey, properties, data); } } @@ -81,15 +81,16 @@ byte[] serialize(Message message) throws IOException { public void ack(Message message) throws IOException { try { channel.basicAck(message.getEnvelope().getDeliveryTag(), SINGLE_MESSAGE); - } catch (IOException e) { - waitForConnection("Could not ack message"); + } catch (Exception e) { + tryToReconnect("Could not ack message"); channel.basicAck(message.getEnvelope().getDeliveryTag(), SINGLE_MESSAGE); } } - private void waitForConnection(String errorMessage) throws IOException { + private void tryToReconnect(String errorMessage) throws IOException { try { connection.waitForConnection(); + channel = connection.getChannel(); } catch (IOException e) { throw new IOException(errorMessage, e); } @@ -99,8 +100,8 @@ public Message receive(String queueName) throws IOException, InvalidMessageExcep GetResponse response; try { response = channel.basicGet(queueName, NO_AUTO_ACK); - } catch (IOException e) { - waitForConnection("Could not receive message from " + queueName); + } catch (Exception e) { + tryToReconnect("Could not receive message from " + queueName); response = channel.basicGet(queueName, NO_AUTO_ACK); } if (response != null) { @@ -127,8 +128,8 @@ public void provideExchange(String exchange) throws IOException { GetResponse response; try { channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE); - } catch (IOException e) { - waitForConnection("Could not declare exchange"); + } catch (Exception e) { + tryToReconnect("Could not declare exchange"); channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, DURABLE); } } @@ -141,8 +142,8 @@ public void declareQueue(String name, String exchange, String routingKey, Map args) throws IOException { try { channel.queueDeclare(name, DURABLE, NOT_EXCLUSIVE, NO_AUTO_DELETE, args); - } catch (IOException e) { - waitForConnection("Could not declare queue"); + } catch (Exception e) { + tryToReconnect("Could not declare queue"); channel.queueDeclare(name, DURABLE, NOT_EXCLUSIVE, NO_AUTO_DELETE, args); } } @@ -150,8 +151,8 @@ public void createQueue(String name, Map args) throws IOExceptio public void bindQueue(String name, String exchange, String routingKey) throws IOException { try { channel.queueBind(name, exchange, routingKey); - } catch (IOException e) { - waitForConnection("Could not bind queue to exchange"); + } catch (Exception e) { + tryToReconnect("Could not bind queue to exchange"); channel.queueBind(name, exchange, routingKey); } }