Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically recover from channel-level exception #516

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
- Automatically recover from channel-level exceptions. This involves a breaking change in the constructor of `FlusswerkConsumer`, which now requires a `RabbitClient` instead of a `Channel`.

## [6.0.1] - 2023-12-06

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ services:
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- 5672:5672
- 15672:15672
- "5672:5672"
- "15672:15672"
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Tracing tracing() {
public Flow flow(Optional<FlowSpec> flowSpec) {
// No FlowSpec → no Flow. We will have to handle this case when creating the
// Engine bean as the sole consumer of the Flow bean.
return flowSpec.map(spec -> new Flow(spec)).orElse(null);
return flowSpec.map(Flow::new).orElse(null);
}

@Bean
Expand Down Expand Up @@ -147,7 +147,7 @@ public FlusswerkMetrics metrics(
public List<FlusswerkConsumer> flusswerkConsumers(
FlusswerkObjectMapper flusswerkObjectMapper,
ProcessingProperties processingProperties,
RabbitConnection rabbitConnection,
RabbitClient rabbitClient,
RoutingProperties routingProperties,
PriorityBlockingQueue<Task> taskQueue) {
int maxPriority = routingProperties.getIncoming().size();
Expand All @@ -161,7 +161,7 @@ public List<FlusswerkConsumer> flusswerkConsumers(
flusswerkConsumers.add(
new FlusswerkConsumer(
availableWorkers,
rabbitConnection.getChannel(),
rabbitClient,
flusswerkObjectMapper,
queueName,
priority,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.flow.Flow;
import com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -17,7 +18,7 @@
* Run flows {@link Flow} for every message from the {@link RabbitClient} - usually several in
* parallel.
*/
public class Engine {
public class Engine implements ChannelListener {

private static final Logger LOGGER = LoggerFactory.getLogger(Engine.class);

Expand Down Expand Up @@ -71,6 +72,9 @@ public void start() {
for (FlusswerkConsumer consumer : consumers) {
rabbitClient.consume(consumer, false);
}

// keep track of channel resets
this.rabbitClient.addChannelListener(this);
}

/**
Expand All @@ -93,7 +97,7 @@ public void stop() {
List<Task> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);

// NACK and requeue all messages that have not be processed yet
// NACK and requeue all messages that have not been processed yet
for (var task : remainingTasks) {
long deliveryTag = task.getMessage().getEnvelope().getDeliveryTag();
try {
Expand All @@ -115,4 +119,12 @@ public void stop() {
LOGGER.error("Timeout awaiting worker shutdown after 5 minutes", e);
}
}

@Override
public void handleReset() {
LOGGER.debug("Register consumers again after channel reset");
for (FlusswerkConsumer consumer : consumers) {
rabbitClient.consume(consumer, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import com.github.dbmdz.flusswerk.framework.jackson.FlusswerkObjectMapper;
import com.github.dbmdz.flusswerk.framework.model.IncomingMessageType;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
Expand All @@ -21,14 +19,15 @@
* Receive AMQP message from a RabbitMQ queue, deserialize the Flusswerk {@link Message} and put
* that into the internal task queue.
*/
public class FlusswerkConsumer extends DefaultConsumer {
public class FlusswerkConsumer implements Consumer {

public static final FlusswerkObjectMapper FALLBACK_MAPPER =
new FlusswerkObjectMapper(new IncomingMessageType());
private static final Logger LOGGER = LoggerFactory.getLogger(FlusswerkConsumer.class);
private volatile String _consumerTag;

private final Semaphore availableWorkers;
private final Channel channel;
private final RabbitClient rabbitClient;
private final FlusswerkObjectMapper flusswerkObjectMapper;
private final PriorityBlockingQueue<Task> taskQueue;
private final int priority;
Expand All @@ -37,38 +36,65 @@ public class FlusswerkConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
* @param rabbitClient the client which handles communication with RabbitMQ
* @param flusswerkObjectMapper the object mapper to deserialize messages
* @param inputQueue the rabbitMQ queue this consumer is bound to
*/
public FlusswerkConsumer(
Semaphore availableWorkers,
Channel channel,
RabbitClient rabbitClient,
FlusswerkObjectMapper flusswerkObjectMapper,
String inputQueue,
int priority,
PriorityBlockingQueue<Task> taskQueue) {
super(channel);
this.availableWorkers = availableWorkers;
this.channel = channel;
this.rabbitClient = rabbitClient;
this.flusswerkObjectMapper = flusswerkObjectMapper;
this.inputQueue = inputQueue;
this.priority = priority;
this.taskQueue = taskQueue;
}

@Override
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}

@Override
public void handleCancelOk(String consumerTag) {
// nothing to do
}

@Override
public void handleCancel(String consumerTag) {
// nothing to do
}

@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// nothing to do
}

@Override
public void handleRecoverOk(String consumerTag) {
// nothing to do
}

public String getConsumerTag() {
return this._consumerTag;
}

@Override
public void handleDelivery(
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {

try {
availableWorkers.acquire();
} catch (InterruptedException e) {
// If waiting for the semaphore is interrupted (e.g. because of shutdown), the current message
// should not be processed at all.
LOGGER.warn("FlusswerkConsumer interrupted while waiting for free worker", e);
channel.basicReject(envelope.getDeliveryTag(), true);
rabbitClient.reject(envelope, true);
return;
}

Expand All @@ -90,7 +116,7 @@ public void handleDelivery(
if (tracing != null) {
LOGGER.error("Could not deserialize message", kv("tracing", tracing), e);
}
channel.basicAck(envelope.getDeliveryTag(), false);
rabbitClient.ack(envelope.getDeliveryTag());
availableWorkers.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import java.io.IOException;

public interface ChannelCommand<T> {
T execute() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.rabbitmq.client.*;
import java.util.Map;

public class ChannelCommands {

private final Channel channel;

public ChannelCommands(Channel channel) {
this.channel = channel;
}

public ChannelCommand<Void> basicPublish(
String exchange, String routingKey, AMQP.BasicProperties properties, byte[] data) {
return () -> {
channel.basicPublish(exchange, routingKey, properties, data);
return null;
};
}

public ChannelCommand<Void> basicAck(long deliveryTag, boolean multiple) {
return () -> {
channel.basicAck(deliveryTag, multiple);
return null;
};
}

public ChannelCommand<Void> basicReject(long deliveryTag, boolean requeue) {
return () -> {
channel.basicReject(deliveryTag, requeue);
return null;
};
}

public ChannelCommand<GetResponse> basicGet(String queue, boolean autoAck) {
return () -> channel.basicGet(queue, autoAck);
}

public ChannelCommand<Void> basicConsume(String queue, boolean autoAck, Consumer consumer) {
return () -> {
channel.basicConsume(queue, autoAck, consumer);
return null;
};
}

public ChannelCommand<Void> exchangeDeclare(
String exchange, BuiltinExchangeType type, boolean durable) {
return () -> {
channel.exchangeDeclare(exchange, type, durable);
return null;
};
}

public ChannelCommand<Void> queueDeclare(
String name,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> args) {
return () -> {
channel.queueDeclare(name, durable, exclusive, autoDelete, args);
return null;
};
}

public ChannelCommand<Void> queueBind(String name, String exchange, String routingKey) {
return () -> {
channel.queueBind(name, exchange, routingKey);
return null;
};
}

public ChannelCommand<Long> messageCount(String queue) {
return () -> channel.messageCount(queue);
}

public ChannelCommand<AMQP.Queue.PurgeOk> queuePurge(String queue) {
return () -> channel.queuePurge(queue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

/** A ChannelListener receives notifications about channel recovery. */
public interface ChannelListener {
void handleReset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ public Message receive(String queueName, boolean autoAck) throws InvalidMessageE
* MessageBroker#ack(Message)}.
*
* @return the received message.
* @throws IOException if communication with RabbitMQ failed.
*/
public Message receive() throws IOException {
public Message receive() {
Message message = null;
for (String inputQueue : routingConfig.getIncoming()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
Expand All @@ -17,34 +15,30 @@ public class Queue {
private static final Logger LOGGER = LoggerFactory.getLogger(Queue.class);

private final String name;
private final Channel channel;
private final RabbitClient rabbitClient;

Queue(String name, RabbitClient rabbitClient) {
this.name = requireNonNull(name);
this.channel = rabbitClient.getChannel();
this.rabbitClient = rabbitClient;
}

/**
* Removes all messages from a queue. These messages cannot be restored.
*
* @return the number of deleted messages
* @throws IOException if an error occurs while purging
*/
public int purge() throws IOException {
var purgeOk = channel.queuePurge(this.name);
public int purge() {
var purgeOk = rabbitClient.queuePurge(this.name);
var deletedMessages = purgeOk.getMessageCount();
LOGGER.warn("Purged queue {} ({} messages deleted)", this.name, deletedMessages);
return deletedMessages;
}

/**
* @return the number of messages in this queue.
* @throws IOException if communication with RabbitMQ fails.
*/
public long messageCount() throws IOException {
return channel.messageCount(this.name);
public long messageCount() {
return rabbitClient.getMessageCount(this.name);
}

/**
Expand Down
Loading