Skip to content

Commit

Permalink
[fix] Release EntryImpl while reading exchange topic (#1256)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 authored Jun 6, 2024
1 parent b71d821 commit 4940c75
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;

import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.util.Backoff;

/**
* Amqp exchange replicator, read entries from BookKeeper and process entries.
Expand Down Expand Up @@ -64,11 +67,13 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

private static final int defaultReadMaxSizeBytes = 5 * 1024 * 1024;
private int routeQueueSize = 200;
private volatile int pendingQueueSize = 0;
private static final AtomicIntegerFieldUpdater<AmqpExchangeReplicator> PENDING_SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AmqpExchangeReplicator.class, "pendingQueueSize");
private int readBatchSize;
private final int readMaxSizeBytes;
private final int readMaxBatchSize;

private static final int FALSE = 0;
private static final int TRUE = 1;
Expand All @@ -89,6 +94,10 @@ protected AmqpExchangeReplicator(PersistentExchange persistentExchange, Executor
this.scheduledExecutorService = topic.getBrokerService().executor();
this.initMaxRouteQueueSize(routeQueueSize);
this.routeExecutor = routeExecutor;
this.readMaxBatchSize = Math.min(this.routeQueueSize,
topic.getBrokerService().getPulsar().getConfig().getDispatcherMaxReadBatchSize());
this.readBatchSize = this.readMaxBatchSize;
this.readMaxSizeBytes = topic.getBrokerService().getPulsar().getConfig().getDispatcherMaxReadSizeBytes();
STATE_UPDATER.set(this, AmqpExchangeReplicator.State.Stopped);
this.name = "[AMQP Replicator for " + topic.getName() + " ]";
}
Expand Down Expand Up @@ -175,11 +184,16 @@ private void readMoreEntries() {
}
int availablePermits = getAvailablePermits();
if (availablePermits > 0) {
int messagesToRead = Math.min(availablePermits, readBatchSize);
// avoid messageToRead is 0
messagesToRead = Math.max(messagesToRead, 1);

if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
log.info("{} Schedule read of {} messages.", name, messagesToRead);
if (log.isDebugEnabled()) {
log.debug("{} Schedule read of {} messages.", name, availablePermits);
log.debug("{} Schedule read of {} messages.", name, messagesToRead);
}
cursor.asyncReadEntriesOrWait(availablePermits, defaultReadMaxSizeBytes, this, null, null);
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, null);
} else {
if (log.isDebugEnabled()) {
log.debug("{} Not schedule read due to pending read. Messages to read {}.",
Expand All @@ -201,6 +215,23 @@ private int getAvailablePermits() {
}
return 0;
}

if (topic.getDispatchRateLimiter().isPresent()
&& topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
long availableOnByte = topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnByte();
long availableOnMsg = topic.getDispatchRateLimiter().get().getAvailableDispatchRateLimitOnMsg();
if (availableOnByte == 0 || availableOnMsg == 0) {
if (log.isDebugEnabled()) {
log.debug("{} Dispatch rate limit is reached, availableOnByte: {}, availableOnMsg: {}.",
name, availableOnByte, availableOnMsg);
}
return -1;
}
if (availableOnMsg > 0) {
availablePermits = Math.min(availablePermits, (int) availableOnMsg);
}
}

return availablePermits;
}

Expand All @@ -210,46 +241,83 @@ public void readEntriesComplete(List<Entry> list, Object o) {
log.debug("{} Read entries complete. Entries size: {}", name, list.size());
}
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
if (list == null || list.isEmpty()) {
if (CollectionUtils.isEmpty(list)) {
long delay = readFailureBackoff.next();
log.warn("{} The read entry list is empty, will retry in {} ms. ReadPosition: {}, LAC: {}.",
name, delay, cursor.getReadPosition(), topic.getManagedLedger().getLastConfirmedEntry());
scheduledExecutorService.schedule(this::readMoreEntries, delay, TimeUnit.MILLISECONDS);
return;
}

if (readBatchSize < readMaxBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, readMaxBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Increasing read batch size from {} to {}", name, readBatchSize,
newReadBatchSize);
}

readBatchSize = newReadBatchSize;
}

readFailureBackoff.reduceToHalf();
List<Pair<PositionImpl, ByteBuf>> bufList = new ArrayList<>(list.size());
routeExecutor.execute(() -> this.handleEntries(list));
}

private void handleEntries(List<Entry> list) {
PENDING_SIZE_UPDATER.addAndGet(this, list.size());

List<Pair<Position, Map<String, Object>>> propsList = new ArrayList<>();
boolean encounterError = false;
for (Entry entry : list) {
bufList.add(
Pair.of(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()), entry.getDataBuffer()));
if (encounterError) {
entry.release();
continue;
}

Map<String, Object> props;
try {
MessageImpl<byte[]> message = MessageImpl.deserialize(entry.getDataBuffer());
props = message.getMessageBuilder().getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
} catch (Exception e) {
log.error("Failed to deserialize entry dataBuffer for topic: {}, rewind cursor.", name, e);
encounterError = true;
propsList.clear();
continue;
}

propsList.add(Pair.of(entry.getPosition(), props));
topic.getDispatchRateLimiter().ifPresent(
limiter -> limiter.consumeDispatchQuota(1, entry.getLength()));
entry.release();
}
if (encounterError) {
cursor.rewind();
PENDING_SIZE_UPDATER.set(this, 0);
this.readMoreEntries();
return;
}
routeExecutor.execute(() -> this.readComplete(bufList));
}

private void readComplete(List<Pair<PositionImpl, ByteBuf>> list) {
for (Pair<PositionImpl, ByteBuf> entry : list) {
PENDING_SIZE_UPDATER.incrementAndGet(this);
readProcess(entry.getRight(), entry.getLeft()).whenCompleteAsync((ignored, exception) -> {
for (var posAndProps : propsList) {
final Position position = posAndProps.getLeft();
routeIndex(posAndProps.getRight(), position).whenCompleteAsync((ignored, exception) -> {
if (exception != null) {
log.error("{} Error producing messages", name, exception);
this.cursor.rewind();
} else {
if (log.isDebugEnabled()) {
log.debug("{} Route message successfully.", name);
}
AmqpExchangeReplicator.this.cursor
.asyncDelete(entry.getLeft(), this, entry.getLeft());
AmqpExchangeReplicator.this.cursor.asyncDelete(position, this, position);
}
if (PENDING_SIZE_UPDATER.decrementAndGet(this) < routeQueueSize * 0.5
&& HAVE_PENDING_READ_UPDATER.get(this) == FALSE) {
if (PENDING_SIZE_UPDATER.decrementAndGet(this) <= 0) {
this.readMoreEntries();
}
}, routeExecutor);
entry.getRight().release();
}
}

public abstract CompletableFuture<Void> readProcess(ByteBuf data, Position position);
public abstract CompletableFuture<Void> routeIndex(Map<String, Object> props, Position position);

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object o) {
Expand All @@ -266,6 +334,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object o) {
log.debug("{} Read entries from bookie failed, retrying in {} s", name, waitTimeMs / 1000, exception);
}
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
scheduledExecutorService.schedule(this::readMoreEntries, waitTimeMs, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange;
import io.streamnative.pulsar.handlers.amqp.AmqpEntryWriter;
import io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator;
Expand All @@ -35,7 +34,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -51,9 +49,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -126,16 +122,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis
if (messageReplicator == null) {
messageReplicator = new AmqpExchangeReplicator(this, routeExecutor, routeQueueSize) {
@Override
public CompletableFuture<Void> readProcess(ByteBuf data, Position position) {
Map<String, Object> props;
try {
MessageImpl<byte[]> message = MessageImpl.deserialize(data);
props = message.getMessageBuilder().getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
} catch (Exception e) {
log.error("Failed to deserialize entry dataBuffer. exchangeName: {}", exchangeName, e);
return FutureUtil.failedFuture(e);
}
public CompletableFuture<Void> routeIndex(Map<String, Object> props, Position position) {

List<CompletableFuture<Void>> routeFutureList = new ArrayList<>();
if (exchangeType == Type.Direct) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}
});
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
when(pulsarService.getConfig()).thenReturn(serviceConfiguration);
when(pulsarService.getOrderedExecutor()).thenReturn(
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build());
when(serviceConfiguration.getNumIOThreads()).thenReturn(2 * Runtime.getRuntime().availableProcessors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public abstract class AmqpProtocolHandlerTestBase {
@Getter
private List<Integer> aopAdminPortList = new ArrayList<>();

private final Integer inFlightSizeInMB = 5;

public AmqpProtocolHandlerTestBase() {
resetConfig();
}
Expand All @@ -128,6 +130,7 @@ protected void resetConfig() {
amqpConfig.setBrokerShutdownTimeoutMs(0L);
amqpConfig.setDefaultNumPartitions(1);
amqpConfig.setTransactionCoordinatorEnabled(true);
amqpConfig.setManagedLedgerMaxReadsInFlightSizeInMB(inFlightSizeInMB);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.streamnative.pulsar.handlers.amqp.rabbitmq;

import static org.junit.Assert.assertTrue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
Expand All @@ -33,9 +35,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -512,4 +516,38 @@ public void handleDelivery(String consumerTag,
channel.close();
conn.close();
}

@Test(timeOut = 15_000)
public void testConsumeMoreThanInFlightSize() throws Exception {
Connection conn = getConnection("vhost1", true);
Channel channel = conn.createChannel();

String qu = randQuName();
channel.queueDeclare(qu, true, false, false, null);

long totalContentSize = conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024 * 1024 * 2;
AtomicLong receiveContentSize = new AtomicLong(0);

CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume(qu, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
if (receiveContentSize.addAndGet(body.length) >= totalContentSize) {
latch.countDown();
}
}
});

AtomicLong sendContentSize = new AtomicLong(0);
byte[] content = RandomUtils.nextBytes(1024 * 100);
do {
channel.basicPublish("", qu, null, content);
} while (sendContentSize.addAndGet(content.length) < totalContentSize);

assertTrue(latch.await(10, TimeUnit.SECONDS));
channel.close();
conn.close();
}

}

0 comments on commit 4940c75

Please sign in to comment.