Skip to content

Commit

Permalink
Add timeouts for secondary readers in piped async writable channel (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun-LinkedIn authored Oct 24, 2024
1 parent 94dec49 commit 6f692bd
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class RestServerConfig {
public static final String ENABLE_ADDED_CHANNEL_HANDLERS = CONFIG_PREFIX + "enable.added.channel.handlers";
public static final String ENABLE_STRUCTURED_LOGGING = CONFIG_PREFIX + "enable.structured.logging";
public static final String ENABLE_COMPOSITE_ROUTER = CONFIG_PREFIX + "enable.composite.router";
private static final String PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS =
CONFIG_PREFIX + "piped.async.channel.secondary.timeout.ms";

/**
* The RestRequestServiceFactory that needs to be used by the RestServer
Expand Down Expand Up @@ -117,6 +119,13 @@ public class RestServerConfig {
@Default("false")
public final boolean restServerEnableAddedChannelHandlers;

/**
* Timeout when writing to secondary reader via PipedAsyncWritableChannel.
*/
@Config(PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS)
@Default("100")
public final int restServerPipedAsyncChannelSecondaryTimeoutMs;

/**
* Set true to enable composite router in RestServer.
*/
Expand Down Expand Up @@ -149,5 +158,7 @@ public RestServerConfig(VerifiableProperties verifiableProperties) {
restServerHealthCheckUri = verifiableProperties.getString("rest.server.health.check.uri", "/healthCheck");
restServerEnableAddedChannelHandlers = verifiableProperties.getBoolean(ENABLE_ADDED_CHANNEL_HANDLERS, false);
restServerEnableCompositeRouter = verifiableProperties.getBoolean(ENABLE_COMPOSITE_ROUTER, false);
restServerPipedAsyncChannelSecondaryTimeoutMs =
verifiableProperties.getInt(PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS, 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@
*/
package com.github.ambry.commons;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.FutureResult;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.utils.Utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
Expand All @@ -41,22 +49,33 @@
public class PipedAsyncWritableChannel implements AsyncWritableChannel {

private final ReadableStreamChannel sourceChannel;
private final int secondaryTimeoutInMs;
private final PipedReadableStreamChannel pipedPrimaryReadChannel;
private PipedReadableStreamChannel pipedSecondaryReadChannel = null;
private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean channelOpen = new AtomicBoolean(true);
private static final Logger logger = LoggerFactory.getLogger(PipedAsyncWritableChannel.class);
final static HashedWheelTimer wheel =
new HashedWheelTimer(new Utils.SchedulerThreadFactory("secondary-read-channel-timeout-handler-", false), 10,
TimeUnit.MILLISECONDS, 1024);
private final Metrics metrics;

/**
* @param sourceChannel The channel that contains the stream of bytes to be read into primary and secondary destinations
* @param withSecondary if {@code true}, sends the bytes from source channel to secondary destination as well
* @param sourceChannel The channel that contains the stream of bytes to be read into primary and secondary
* destinations
* @param withSecondary if {@code true}, sends the bytes from source channel to secondary destination as well
* @param secondaryTimeoutInMs time in milliseconds after which secondary read channel will be closed
* @param metricRegistry
*/
public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean withSecondary) {
public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean withSecondary, int secondaryTimeoutInMs,
MetricRegistry metricRegistry) {
this.sourceChannel = sourceChannel;
this.secondaryTimeoutInMs = secondaryTimeoutInMs;

pipedPrimaryReadChannel = new PipedReadableStreamChannel(true);
if (withSecondary) {
pipedSecondaryReadChannel = new PipedReadableStreamChannel(false);
wheel.start();
}

sourceChannel.readInto(this, (result, exception) -> {
Expand All @@ -68,6 +87,15 @@ public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean wi
// Close this writable channel. It will close the piped readable channels as well.
close();
});
metrics = new Metrics(metricRegistry);
}

/**
* Used in tests
* @return metrics collected in this class
*/
Metrics getMetrics() {
return metrics;
}

/**
Expand Down Expand Up @@ -266,6 +294,7 @@ private class ChunkData {
private Result secondaryWriteCallbackResult;
private final Lock lock = new ReentrantLock();
private final AtomicBoolean chunkResolved = new AtomicBoolean(false);
private Timeout secondaryTimeout = null;

/**
* Create a new instance of ChunkData with the given parameters.
Expand Down Expand Up @@ -313,6 +342,9 @@ private Callback<Long> makeCallbackForReadableStreamChannel(boolean primary, Ato
} else {
// This callback is coming from secondary reader
secondaryWriteCallbackResult = new Result(result, exception);
if (secondaryTimeout != null) {
secondaryTimeout.cancel();
}
}

if (PipedAsyncWritableChannel.this.pipedSecondaryReadChannel == null) {
Expand All @@ -334,11 +366,23 @@ private Callback<Long> makeCallbackForReadableStreamChannel(boolean primary, Ato
}
resolveChunk(primaryWriteCallbackResult.bytesWritten, primaryWriteCallbackResult.exception);
} else if (primaryWriteCallbackResult != null) {
// TODO: Write successful callback came from primary but not from secondary. We will "start a timer" to wait
// for result from secondary. If we time out waiting for the result, we will close secondary and send the
// remaining bytes to primary alone so that primary upload SLA is not affected.
} else {
// Do nothing. Callback from Secondary came. We will wait for callback to come from primary
long startTimeMs = System.currentTimeMillis();
secondaryTimeout = wheel.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
logger.error("Closing secondary channel since it is unresponsive");
// Measure difference between expected delay and actual delay.
long actualDelayInMs = System.currentTimeMillis() - startTimeMs;
metrics.secondaryTimeoutCorrectionTimeInMs.update(
Math.abs(actualDelayInMs - (long) secondaryTimeoutInMs));
metrics.secondaryTimeOutCount.inc();
// Close the secondary channel
PipedAsyncWritableChannel.this.closeSecondary(new ClosedChannelException());
// Invoke write call back for waiting primary
ChunkData.this.resolveChunk(primaryWriteCallbackResult.bytesWritten,
primaryWriteCallbackResult.exception);
}
}, secondaryTimeoutInMs, TimeUnit.MILLISECONDS);
}
}
} finally {
Expand All @@ -357,4 +401,23 @@ private Result(long bytesWritten, Exception exception) {
this.exception = exception;
}
}

/**
* Metrics for {@link PipedAsyncWritableChannel}.
*/
static class Metrics {
public final Histogram secondaryTimeoutCorrectionTimeInMs;
public final Counter secondaryTimeOutCount;

/**
* Constructor to create the metrics;
* @param registry
*/
public Metrics(MetricRegistry registry) {
secondaryTimeoutCorrectionTimeInMs = registry.histogram(
MetricRegistry.name(PipedAsyncWritableChannel.class, "SecondaryTimeoutCorrectionTimeInMs"));
secondaryTimeOutCount =
registry.counter(MetricRegistry.name(PipedAsyncWritableChannel.class, "SecondaryTimeOutCount"));
}
}
}
Loading

0 comments on commit 6f692bd

Please sign in to comment.