Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
schmika committed Dec 14, 2023
1 parent 30bb4d5 commit 5c79d61
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 192 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.flow.Flow;
import com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void stop() {
List<Task> remainingTasks = new ArrayList<>();
taskQueue.drainTo(remainingTasks);

// NACK and requeue all messages that have not be processed yet
// NACK and requeue all messages that have not been processed yet
for (var task : remainingTasks) {
long deliveryTag = task.getMessage().getEnvelope().getDeliveryTag();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import java.io.IOException;

public interface ChannelCommand<T> {
T execute() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.github.dbmdz.flusswerk.framework.rabbitmq;

import com.rabbitmq.client.*;
import java.util.Map;

public class ChannelCommands {

private final Channel channel;

public ChannelCommands(Channel channel) {
this.channel = channel;
}

public ChannelCommand<Void> basicPublish(
String exchange, String routingKey, AMQP.BasicProperties properties, byte[] data) {
return () -> {
channel.basicPublish(exchange, routingKey, properties, data);
return null;
};
}

public ChannelCommand<Void> basicAck(long deliveryTag, boolean multiple) {
return () -> {
channel.basicAck(deliveryTag, multiple);
return null;
};
}

public ChannelCommand<Void> basicReject(long deliveryTag, boolean requeue) {
return () -> {
channel.basicReject(deliveryTag, requeue);
return null;
};
}

public ChannelCommand<GetResponse> basicGet(String queue, boolean autoAck) {
return () -> channel.basicGet(queue, autoAck);
}

public ChannelCommand<Void> basicConsume(String queue, boolean autoAck, Consumer consumer) {
return () -> {
channel.basicConsume(queue, autoAck, consumer);
return null;
};
}

public ChannelCommand<Void> exchangeDeclare(
String exchange, BuiltinExchangeType type, boolean durable) {
return () -> {
channel.exchangeDeclare(exchange, type, durable);
return null;
};
}

public ChannelCommand<Void> queueDeclare(
String name,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> args) {
return () -> {
channel.queueDeclare(name, durable, exclusive, autoDelete, args);
return null;
};
}

public ChannelCommand<Void> queueBind(String name, String exchange, String routingKey) {
return () -> {
channel.queueBind(name, exchange, routingKey);
return null;
};
}

public ChannelCommand<Long> messageCount(String queue) {
return () -> channel.messageCount(queue);
}

public ChannelCommand<AMQP.Queue.PurgeOk> queuePurge(String queue) {
return () -> channel.queuePurge(queue);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.github.dbmdz.flusswerk.framework.engine;
package com.github.dbmdz.flusswerk.framework.rabbitmq;

/** A ChannelListener receives notifications about channel recovery. */
public interface ChannelListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.github.dbmdz.flusswerk.framework.exceptions.InvalidMessageException;
import com.github.dbmdz.flusswerk.framework.model.Message;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
Expand Down Expand Up @@ -38,7 +37,7 @@ public int purge() {
/**
* @return the number of messages in this queue.
*/
public long messageCount() throws IOException {
public long messageCount() {
return rabbitClient.getMessageCount(this.name);
}

Expand Down
Loading

0 comments on commit 5c79d61

Please sign in to comment.