Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]Support DLQ, TTL,Delay message #929

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;

/**
* Base class of AMQP exchange.
*/
public abstract class AbstractAmqpExchange implements AmqpExchange {

@Getter
protected final Map<String, String> properties;
protected final String exchangeName;
protected final AmqpExchange.Type exchangeType;
protected Set<AmqpQueue> queues;
Expand All @@ -37,7 +40,8 @@ public abstract class AbstractAmqpExchange implements AmqpExchange {

protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeType,
Set<AmqpQueue> queues, boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments) {
Map<String, Object> arguments, Map<String, String> properties) {
this.properties = properties;
this.exchangeName = exchangeName;
this.exchangeType = exchangeType;
this.queues = queues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package io.streamnative.pulsar.handlers.amqp;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;

/**
* Base class for AMQP queue.
Expand All @@ -31,6 +33,9 @@ public abstract class AbstractAmqpQueue implements AmqpQueue {
protected boolean exclusive;
protected boolean autoDelete;
protected final Map<String, AmqpMessageRouter> routers = new ConcurrentHashMap<>();
protected final Map<String, Object> arguments = new HashMap<>();
@Getter
protected Map<String, String> properties;

protected AbstractAmqpQueue(String queueName, boolean durable, long connectionId) {
this.queueName = queueName;
Expand All @@ -42,12 +47,13 @@ protected AbstractAmqpQueue(String queueName, boolean durable, long connectionId

protected AbstractAmqpQueue(String queueName,
boolean durable, long connectionId,
boolean exclusive, boolean autoDelete) {
boolean exclusive, boolean autoDelete, Map<String, String> properties) {
this.queueName = queueName;
this.durable = durable;
this.connectionId = connectionId;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
this.properties = properties;
}

@Override
Expand All @@ -60,6 +66,20 @@ public boolean getDurable() {
return durable;
}

@Override
public boolean getExclusive() {
return exclusive;
}
@Override
public boolean getAutoDelete() {
return autoDelete;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public AmqpMessageRouter getRouter(String exchangeName) {
return routers.get(exchangeName);
Expand Down Expand Up @@ -133,4 +153,8 @@ public boolean isAutoDelete() {
return autoDelete;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

import io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.pulsar.handlers.amqp.admin.AmqpAdmin;
import io.streamnative.pulsar.handlers.amqp.common.exception.AoPServiceRuntimeException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;

/**
* AMQP broker related.
Expand All @@ -43,16 +48,32 @@ public class AmqpBrokerService {
@Getter
private AmqpAdmin amqpAdmin;

@Getter
private final PulsarClient pulsarClient;

public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration config) {
try {
this.pulsarClient = PulsarClient.builder()
.serviceUrl(config.getAmqpServerAddress())
.ioThreads(config.getNumIOThreads())
.listenerThreads(config.getNumIOThreads())
.memoryLimit(0, SizeUnit.BYTES)
.statsInterval(0, TimeUnit.MILLISECONDS)
.connectionMaxIdleSeconds(-1)
.build();
} catch (PulsarClientException e) {
throw new AoPServiceRuntimeException(e);
}
String clusterName = pulsarService.getBrokerService().getPulsar().getConfiguration().getClusterName();
this.amqpAdmin = new AmqpAdmin(config.getAdvertisedAddress(), config.getAmqpAdminPort());
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService,
initRouteExecutor(config), config);
initRouteExecutor(config), config, amqpAdmin, pulsarClient);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, config);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
this.amqpAdmin = new AmqpAdmin("localhost", config.getAmqpAdminPort());
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer, amqpTopicManager);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer, config);
}

private ExecutorService initRouteExecutor(AmqpServiceConfiguration config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void receiveExchangeBound(AMQShortString exchange, AMQShortString routing
}

exchangeService.exchangeBound(
connection.getNamespaceName(), exchange.toString(), routingKey.toString(), queueName.toString())
connection.getNamespaceName(), exchange.toString(), routingKey.toString(), queueName.toString())
.thenAccept(replyCode -> {
String replyText = null;
switch (replyCode) {
Expand Down Expand Up @@ -253,18 +253,18 @@ public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean d
channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
}
queueService.queueDeclare(connection.getNamespaceName(), queue.toString(), passive, durable, exclusive,
autoDelete, nowait, FieldTable.convertToMap(arguments), connection.getConnectionId())
autoDelete, nowait, FieldTable.convertToMap(arguments), connection.getConnectionId())
.thenAccept(amqpQueue -> {
setDefaultQueue(amqpQueue);
MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(
AMQShortString.createAMQShortString(amqpQueue.getName()), 0, 0);
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to declare queue {} in vhost {}", queue, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});
setDefaultQueue(amqpQueue);
MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(
AMQShortString.createAMQShortString(amqpQueue.getName()), 0, 0);
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to declare queue {} in vhost {}", queue, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});
}

@Override
Expand Down Expand Up @@ -361,7 +361,7 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM
public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) {
if (log.isDebugEnabled()) {
log.debug("RECV[{}] BasicQos[prefetchSize: {} prefetchCount: {} global: {}]",
channelId, prefetchSize, prefetchCount, global);
channelId, prefetchSize, prefetchCount, global);
}
if (prefetchSize > 0) {
closeChannel(ErrorCodes.NOT_IMPLEMENTED, "prefetchSize not supported ");
Expand Down Expand Up @@ -412,7 +412,7 @@ protected String getConsumerTag(AMQShortString consumerTag) {
}

private synchronized void subscribe(String consumerTag, String queueName, Topic topic,
boolean ack, boolean exclusive, boolean nowait) {
boolean ack, boolean exclusive, boolean nowait) {

CompletableFuture<Void> future = new CompletableFuture<>();
future.whenComplete((ignored, e) -> {
Expand All @@ -432,28 +432,28 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic
CompletableFuture<Subscription> subscriptionFuture = topic.createSubscription(
defaultSubscription, CommandSubscribe.InitialPosition.Earliest, false, null);
subscriptionFuture.thenAccept(subscription -> {
AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription,
exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared,
topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
subscription.addConsumer(consumer).thenAccept(__ -> {
consumer.handleFlow(DEFAULT_CONSUMER_PERMIT);
tag2ConsumersMap.put(consumerTag, consumer);

if (!nowait) {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.
createBasicConsumeOkBody(AMQShortString.
createAMQShortString(consumer.getConsumerTag()));
connection.writeFrame(responseBody.generateFrame(channelId));
}
future.complete(null);
}).exceptionally(t -> {
future.completeExceptionally(t);
return null;
});
AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription,
exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared,
topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
subscription.addConsumer(consumer).thenAccept(__ -> {
consumer.handleFlow(DEFAULT_CONSUMER_PERMIT);
tag2ConsumersMap.put(consumerTag, consumer);

if (!nowait) {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.
createBasicConsumeOkBody(AMQShortString.
createAMQShortString(consumer.getConsumerTag()));
connection.writeFrame(responseBody.generateFrame(channelId));
}
future.complete(null);
}).exceptionally(t -> {
future.completeExceptionally(t);
return null;
});
}).exceptionally(t -> {
future.completeExceptionally(t);
return null;
Expand Down Expand Up @@ -740,17 +740,19 @@ public boolean ignoreAllButCloseOk() {
public void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue) {
if (log.isDebugEnabled()) {
log.debug("RECV[ {} ] BasicNAck[deliveryTag: {} multiple: {} requeue: {}]",
channelId, deliveryTag, multiple, requeue);
channelId, deliveryTag, multiple, requeue);
}
messageNAck(deliveryTag, multiple, requeue);
}

public void messageNAck(long deliveryTag, boolean multiple, boolean requeue) {
Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> ackedMessages =
unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
if (!ackedMessages.isEmpty()) {
if (requeue) {
requeue(ackedMessages);
} else {
discardMessage(ackedMessages);
}
} else {
closeChannel(ErrorCodes.IN_USE, "deliveryTag not found");
Expand All @@ -760,10 +762,21 @@ public void messageNAck(long deliveryTag, boolean multiple, boolean requeue) {
}
}

private void discardMessage(Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> messages) {
Map<UnacknowledgedMessageMap.MessageProcessor, List<PositionImpl>> positionMap = new HashMap<>();
messages.forEach(association -> {
UnacknowledgedMessageMap.MessageProcessor consumer = association.getConsumer();
List<PositionImpl> positions = positionMap.computeIfAbsent(consumer,
list -> new ArrayList<>());
positions.add((PositionImpl) association.getPosition());
});
positionMap.forEach(UnacknowledgedMessageMap.MessageProcessor::discardMessage);
}

@Override
public void receiveBasicRecover(boolean requeue, boolean sync) {
Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> ackedMessages =
unacknowledgedMessageMap.acknowledgeAll();
unacknowledgedMessageMap.acknowledgeAll();
if (!ackedMessages.isEmpty()) {
requeue(ackedMessages);
}
Expand All @@ -782,7 +795,7 @@ private void requeue(Collection<UnacknowledgedMessageMap.MessageConsumerAssociat
messages.stream().forEach(association -> {
UnacknowledgedMessageMap.MessageProcessor consumer = association.getConsumer();
List<PositionImpl> positions = positionMap.computeIfAbsent(consumer,
list -> new ArrayList<>());
list -> new ArrayList<>());
positions.add((PositionImpl) association.getPosition());
});
positionMap.entrySet().stream().forEach(entry -> {
Expand All @@ -806,7 +819,8 @@ private void messageAck(long deliveryTag, boolean multiple) {
entry.getConsumer().messageAck(entry.getPosition());
});
} else {
closeChannel(ErrorCodes.IN_USE, "deliveryTag not found");
// TODO
// closeChannel(ErrorCodes.IN_USE, "deliveryTag not found");
}
if (creditManager.hasCredit() && isBlockedOnCredit()) {
unBlockedOnCredit();
Expand Down Expand Up @@ -904,7 +918,7 @@ public void closeChannel(int cause, final String message) {
connection.closeChannelAndWriteFrame(this, cause, message);
}

public long getNextDeliveryTag() {
public synchronized long getNextDeliveryTag() {
return ++deliveryTag;
}

Expand Down Expand Up @@ -1015,16 +1029,16 @@ public AmqpFlowCreditManager getCreditManager() {

protected void handleAoPException(Throwable t) {
Throwable cause = FutureUtil.unwrapCompletionException(t);
if (!(cause instanceof AoPException)) {
if (!(cause instanceof AoPException exception)) {
connection.sendConnectionClose(INTERNAL_ERROR, t.getMessage(), channelId);
return;
}
AoPException exception = (AoPException) cause;
if (exception.isCloseChannel()) {
closeChannel(exception.getErrorCode(), exception.getMessage());
}
if (exception.isCloseConnection()) {
connection.sendConnectionClose(exception.getErrorCode(), exception.getMessage(), channelId);
} else if (exception.isCloseChannel()) {
closeChannel(exception.getErrorCode(), exception.getMessage());
} else {
connection.sendConnectionClose(INTERNAL_ERROR, exception.getMessage(), channelId);
}
}

Expand Down
Loading