Skip to content

Commit

Permalink
Merge pull request #516 from dbmdz/channel_exception
Browse files Browse the repository at this point in the history
Automatically recover from channel-level exception
  • Loading branch information
schmika authored Dec 14, 2023
2 parents 1b237b3 + fdad0be commit 7772fad
Show file tree
Hide file tree
Showing 31 changed files with 360 additions and 222 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
- Automatically recover from channel-level exceptions. This involves a breaking change in the constructor of `FlusswerkConsumer`, which now requires a `RabbitClient` instead of a `Channel`.

## [6.0.1] - 2023-12-06

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ services:
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- 5672:5672
- 15672:15672
- "5672:5672"
- "15672:15672"
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Tracing tracing() {
public Flow flow(Optional<FlowSpec> flowSpec) {
// No FlowSpec → no Flow. We will have to handle this case when creating the
// Engine bean as the sole consumer of the Flow bean.
return flowSpec.map(spec -> new Flow(spec)).orElse(null);
return flowSpec.map(Flow::new).orElse(null);
}

@Bean
Expand Down Expand Up @@ -147,7 +147,7 @@ public FlusswerkMetrics metrics(
public List<FlusswerkConsumer> flusswerkConsumers(
FlusswerkObjectMapper flusswerkObjectMapper,
ProcessingProperties processingProperties,
RabbitConnection rabbitConnection,
RabbitClient rabbitClient,
RoutingProperties routingProperties,
PriorityBlockingQueue<Task> taskQueue) {
int maxPriority = routingProperties.getIncoming().size();
Expand All @@ -161,7 +161,7 @@ public List<FlusswerkConsumer> flusswerkConsumers(
flusswerkConsumers.add(
new FlusswerkConsumer(
availableWorkers,
rabbitConnection.getChannel(),
rabbitClient,
flusswerkObjectMapper,
queueName,
priority,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.flow.Flow;
import com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -17,7 +18,7 @@
* Run flows {@link Flow} for every message from the {@link RabbitClient} - usually several in
* parallel.
*/
public class Engine {
public class Engine implements ChannelListener {

private static final Logger LOGGER = LoggerFactory.getLogger(Engine.class);

Expand Down Expand Up @@ -71,6 +72,9 @@ public void start() {
for (FlusswerkConsumer consumer : consumers) {
rabbitClient.consume(consumer, false);
}

// keep track of channel resets
this.rabbitClient.addChannelListener(this);
}

/**
Expand All @@ -93,7 +97,7 @@ public void stop() {
List<Task> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);

// NACK and requeue all messages that have not be processed yet
// NACK and requeue all messages that have not been processed yet
for (var task : remainingTasks) {
long deliveryTag = task.getMessage().getEnvelope().getDeliveryTag();
try {
Expand All @@ -115,4 +119,12 @@ public void stop() {
LOGGER.error("Timeout awaiting worker shutdown after 5 minutes", e);
}
}

@Override
public void handleReset() {
LOGGER.debug("Register consumers again after channel reset");
for (FlusswerkConsumer consumer : consumers) {
rabbitClient.consume(consumer, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import com.github.dbmdz.flusswerk.framework.jackson.FlusswerkObjectMapper;
import com.github.dbmdz.flusswerk.framework.model.IncomingMessageType;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
Expand All @@ -21,14 +19,15 @@
* Receive AMQP message from a RabbitMQ queue, deserialize the Flusswerk {@link Message} and put
* that into the internal task queue.
*/
public class FlusswerkConsumer extends DefaultConsumer {
public class FlusswerkConsumer implements Consumer {

public static final FlusswerkObjectMapper FALLBACK_MAPPER =
new FlusswerkObjectMapper(new IncomingMessageType());
private static final Logger LOGGER = LoggerFactory.getLogger(FlusswerkConsumer.class);
private volatile String _consumerTag;

private final Semaphore availableWorkers;
private final Channel channel;
private final RabbitClient rabbitClient;
private final FlusswerkObjectMapper flusswerkObjectMapper;
private final PriorityBlockingQueue<Task> taskQueue;
private final int priority;
Expand All @@ -37,38 +36,65 @@ public class FlusswerkConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
* @param rabbitClient the client which handles communication with RabbitMQ
* @param flusswerkObjectMapper the object mapper to deserialize messages
* @param inputQueue the rabbitMQ queue this consumer is bound to
*/
public FlusswerkConsumer(
Semaphore availableWorkers,
Channel channel,
RabbitClient rabbitClient,
FlusswerkObjectMapper flusswerkObjectMapper,
String inputQueue,
int priority,
PriorityBlockingQueue<Task> taskQueue) {
super(channel);
this.availableWorkers = availableWorkers;
this.channel = channel;
this.rabbitClient = rabbitClient;
this.flusswerkObjectMapper = flusswerkObjectMapper;
this.inputQueue = inputQueue;
this.priority = priority;
this.taskQueue = taskQueue;
}

@Override
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}

@Override
public void handleCancelOk(String consumerTag) {
// nothing to do
}

@Override
public void handleCancel(String consumerTag) {
// nothing to do
}

@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// nothing to do
}

@Override
public void handleRecoverOk(String consumerTag) {
// nothing to do
}

public String getConsumerTag() {
return this._consumerTag;
}

@Override
public void handleDelivery(
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {

try {
availableWorkers.acquire();
} catch (InterruptedException e) {
// If waiting for the semaphore is interrupted (e.g. because of shutdown), the current message
// should not be processed at all.
LOGGER.warn("FlusswerkConsumer interrupted while waiting for free worker", e);
channel.basicReject(envelope.getDeliveryTag(), true);
rabbitClient.reject(envelope, true);
return;
}

Expand All @@ -90,7 +116,7 @@ public void handleDelivery(
if (tracing != null) {
LOGGER.error("Could not deserialize message", kv("tracing", tracing), e);
}
channel.basicAck(envelope.getDeliveryTag(), false);
rabbitClient.ack(envelope.getDeliveryTag());
availableWorkers.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import java.io.IOException;

public interface ChannelCommand<T> {
T execute() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.rabbitmq.client.*;
import java.util.Map;

public class ChannelCommands {

private final Channel channel;

public ChannelCommands(Channel channel) {
this.channel = channel;
}

public ChannelCommand<Void> basicPublish(
String exchange, String routingKey, AMQP.BasicProperties properties, byte[] data) {
return () -> {
channel.basicPublish(exchange, routingKey, properties, data);
return null;
};
}

public ChannelCommand<Void> basicAck(long deliveryTag, boolean multiple) {
return () -> {
channel.basicAck(deliveryTag, multiple);
return null;
};
}

public ChannelCommand<Void> basicReject(long deliveryTag, boolean requeue) {
return () -> {
channel.basicReject(deliveryTag, requeue);
return null;
};
}

public ChannelCommand<GetResponse> basicGet(String queue, boolean autoAck) {
return () -> channel.basicGet(queue, autoAck);
}

public ChannelCommand<Void> basicConsume(String queue, boolean autoAck, Consumer consumer) {
return () -> {
channel.basicConsume(queue, autoAck, consumer);
return null;
};
}

public ChannelCommand<Void> exchangeDeclare(
String exchange, BuiltinExchangeType type, boolean durable) {
return () -> {
channel.exchangeDeclare(exchange, type, durable);
return null;
};
}

public ChannelCommand<Void> queueDeclare(
String name,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> args) {
return () -> {
channel.queueDeclare(name, durable, exclusive, autoDelete, args);
return null;
};
}

public ChannelCommand<Void> queueBind(String name, String exchange, String routingKey) {
return () -> {
channel.queueBind(name, exchange, routingKey);
return null;
};
}

public ChannelCommand<Long> messageCount(String queue) {
return () -> channel.messageCount(queue);
}

public ChannelCommand<AMQP.Queue.PurgeOk> queuePurge(String queue) {
return () -> channel.queuePurge(queue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

/** A ChannelListener receives notifications about channel recovery. */
public interface ChannelListener {
void handleReset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ public Message receive(String queueName, boolean autoAck) throws InvalidMessageE
* MessageBroker#ack(Message)}.
*
* @return the received message.
* @throws IOException if communication with RabbitMQ failed.
*/
public Message receive() throws IOException {
public Message receive() {
Message message = null;
for (String inputQueue : routingConfig.getIncoming()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
Expand All @@ -17,34 +15,30 @@ public class Queue {
private static final Logger LOGGER = LoggerFactory.getLogger(Queue.class);

private final String name;
private final Channel channel;
private final RabbitClient rabbitClient;

Queue(String name, RabbitClient rabbitClient) {
this.name = requireNonNull(name);
this.channel = rabbitClient.getChannel();
this.rabbitClient = rabbitClient;
}

/**
* Removes all messages from a queue. These messages cannot be restored.
*
* @return the number of deleted messages
* @throws IOException if an error occurs while purging
*/
public int purge() throws IOException {
var purgeOk = channel.queuePurge(this.name);
public int purge() {
var purgeOk = rabbitClient.queuePurge(this.name);
var deletedMessages = purgeOk.getMessageCount();
LOGGER.warn("Purged queue {} ({} messages deleted)", this.name, deletedMessages);
return deletedMessages;
}

/**
* @return the number of messages in this queue.
* @throws IOException if communication with RabbitMQ fails.
*/
public long messageCount() throws IOException {
return channel.messageCount(this.name);
public long messageCount() {
return rabbitClient.getMessageCount(this.name);
}

/**
Expand Down
Loading

0 comments on commit 7772fad

Please sign in to comment.