diff --git a/ambry-api/src/main/java/com/github/ambry/config/RestServerConfig.java b/ambry-api/src/main/java/com/github/ambry/config/RestServerConfig.java index 3aa32b01eb..84d27c61bb 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/RestServerConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/RestServerConfig.java @@ -21,6 +21,8 @@ public class RestServerConfig { public static final String ENABLE_ADDED_CHANNEL_HANDLERS = CONFIG_PREFIX + "enable.added.channel.handlers"; public static final String ENABLE_STRUCTURED_LOGGING = CONFIG_PREFIX + "enable.structured.logging"; public static final String ENABLE_COMPOSITE_ROUTER = CONFIG_PREFIX + "enable.composite.router"; + private static final String PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS = + CONFIG_PREFIX + "piped.async.channel.secondary.timeout.ms"; /** * The RestRequestServiceFactory that needs to be used by the RestServer @@ -117,6 +119,13 @@ public class RestServerConfig { @Default("false") public final boolean restServerEnableAddedChannelHandlers; + /** + * Timeout when writing to secondary reader via PipedAsyncWritableChannel. + */ + @Config(PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS) + @Default("100") + public final int restServerPipedAsyncChannelSecondaryTimeoutMs; + /** * Set true to enable composite router in RestServer. */ @@ -149,5 +158,7 @@ public RestServerConfig(VerifiableProperties verifiableProperties) { restServerHealthCheckUri = verifiableProperties.getString("rest.server.health.check.uri", "/healthCheck"); restServerEnableAddedChannelHandlers = verifiableProperties.getBoolean(ENABLE_ADDED_CHANNEL_HANDLERS, false); restServerEnableCompositeRouter = verifiableProperties.getBoolean(ENABLE_COMPOSITE_ROUTER, false); + restServerPipedAsyncChannelSecondaryTimeoutMs = + verifiableProperties.getInt(PIPED_ASYNC_CHANNEL_SECONDARY_TIMEOUT_MS, 100); } } diff --git a/ambry-commons/src/main/java/com/github/ambry/commons/PipedAsyncWritableChannel.java b/ambry-commons/src/main/java/com/github/ambry/commons/PipedAsyncWritableChannel.java index b592e0b5bf..703afba199 100644 --- a/ambry-commons/src/main/java/com/github/ambry/commons/PipedAsyncWritableChannel.java +++ b/ambry-commons/src/main/java/com/github/ambry/commons/PipedAsyncWritableChannel.java @@ -14,16 +14,24 @@ */ package com.github.ambry.commons; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.router.AsyncWritableChannel; import com.github.ambry.router.FutureResult; import com.github.ambry.router.ReadableStreamChannel; +import com.github.ambry.utils.Utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Queue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -41,22 +49,33 @@ public class PipedAsyncWritableChannel implements AsyncWritableChannel { private final ReadableStreamChannel sourceChannel; + private final int secondaryTimeoutInMs; private final PipedReadableStreamChannel pipedPrimaryReadChannel; private PipedReadableStreamChannel pipedSecondaryReadChannel = null; private final ReentrantLock lock = new ReentrantLock(); private final AtomicBoolean channelOpen = new AtomicBoolean(true); private static final Logger logger = LoggerFactory.getLogger(PipedAsyncWritableChannel.class); + final static HashedWheelTimer wheel = + new HashedWheelTimer(new Utils.SchedulerThreadFactory("secondary-read-channel-timeout-handler-", false), 10, + TimeUnit.MILLISECONDS, 1024); + private final Metrics metrics; /** - * @param sourceChannel The channel that contains the stream of bytes to be read into primary and secondary destinations - * @param withSecondary if {@code true}, sends the bytes from source channel to secondary destination as well + * @param sourceChannel The channel that contains the stream of bytes to be read into primary and secondary + * destinations + * @param withSecondary if {@code true}, sends the bytes from source channel to secondary destination as well + * @param secondaryTimeoutInMs time in milliseconds after which secondary read channel will be closed + * @param metricRegistry */ - public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean withSecondary) { + public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean withSecondary, int secondaryTimeoutInMs, + MetricRegistry metricRegistry) { this.sourceChannel = sourceChannel; + this.secondaryTimeoutInMs = secondaryTimeoutInMs; pipedPrimaryReadChannel = new PipedReadableStreamChannel(true); if (withSecondary) { pipedSecondaryReadChannel = new PipedReadableStreamChannel(false); + wheel.start(); } sourceChannel.readInto(this, (result, exception) -> { @@ -68,6 +87,15 @@ public PipedAsyncWritableChannel(ReadableStreamChannel sourceChannel, boolean wi // Close this writable channel. It will close the piped readable channels as well. close(); }); + metrics = new Metrics(metricRegistry); + } + + /** + * Used in tests + * @return metrics collected in this class + */ + Metrics getMetrics() { + return metrics; } /** @@ -266,6 +294,7 @@ private class ChunkData { private Result secondaryWriteCallbackResult; private final Lock lock = new ReentrantLock(); private final AtomicBoolean chunkResolved = new AtomicBoolean(false); + private Timeout secondaryTimeout = null; /** * Create a new instance of ChunkData with the given parameters. @@ -313,6 +342,9 @@ private Callback makeCallbackForReadableStreamChannel(boolean primary, Ato } else { // This callback is coming from secondary reader secondaryWriteCallbackResult = new Result(result, exception); + if (secondaryTimeout != null) { + secondaryTimeout.cancel(); + } } if (PipedAsyncWritableChannel.this.pipedSecondaryReadChannel == null) { @@ -334,11 +366,23 @@ private Callback makeCallbackForReadableStreamChannel(boolean primary, Ato } resolveChunk(primaryWriteCallbackResult.bytesWritten, primaryWriteCallbackResult.exception); } else if (primaryWriteCallbackResult != null) { - // TODO: Write successful callback came from primary but not from secondary. We will "start a timer" to wait - // for result from secondary. If we time out waiting for the result, we will close secondary and send the - // remaining bytes to primary alone so that primary upload SLA is not affected. - } else { - // Do nothing. Callback from Secondary came. We will wait for callback to come from primary + long startTimeMs = System.currentTimeMillis(); + secondaryTimeout = wheel.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + logger.error("Closing secondary channel since it is unresponsive"); + // Measure difference between expected delay and actual delay. + long actualDelayInMs = System.currentTimeMillis() - startTimeMs; + metrics.secondaryTimeoutCorrectionTimeInMs.update( + Math.abs(actualDelayInMs - (long) secondaryTimeoutInMs)); + metrics.secondaryTimeOutCount.inc(); + // Close the secondary channel + PipedAsyncWritableChannel.this.closeSecondary(new ClosedChannelException()); + // Invoke write call back for waiting primary + ChunkData.this.resolveChunk(primaryWriteCallbackResult.bytesWritten, + primaryWriteCallbackResult.exception); + } + }, secondaryTimeoutInMs, TimeUnit.MILLISECONDS); } } } finally { @@ -357,4 +401,23 @@ private Result(long bytesWritten, Exception exception) { this.exception = exception; } } + + /** + * Metrics for {@link PipedAsyncWritableChannel}. + */ + static class Metrics { + public final Histogram secondaryTimeoutCorrectionTimeInMs; + public final Counter secondaryTimeOutCount; + + /** + * Constructor to create the metrics; + * @param registry + */ + public Metrics(MetricRegistry registry) { + secondaryTimeoutCorrectionTimeInMs = registry.histogram( + MetricRegistry.name(PipedAsyncWritableChannel.class, "SecondaryTimeoutCorrectionTimeInMs")); + secondaryTimeOutCount = + registry.counter(MetricRegistry.name(PipedAsyncWritableChannel.class, "SecondaryTimeOutCount")); + } + } } diff --git a/ambry-commons/src/test/java/com/github/ambry/commons/PipedAsyncWritableChannelTest.java b/ambry-commons/src/test/java/com/github/ambry/commons/PipedAsyncWritableChannelTest.java index f75a615ff7..840dd1b5a6 100644 --- a/ambry-commons/src/test/java/com/github/ambry/commons/PipedAsyncWritableChannelTest.java +++ b/ambry-commons/src/test/java/com/github/ambry/commons/PipedAsyncWritableChannelTest.java @@ -15,9 +15,11 @@ package com.github.ambry.commons; +import com.codahale.metrics.MetricRegistry; import com.github.ambry.router.AsyncWritableChannel; import com.github.ambry.router.ReadableStreamChannel; import com.github.ambry.utils.NettyByteBufLeakHelper; +import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -25,11 +27,13 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,6 +45,8 @@ public class PipedAsyncWritableChannelTest { private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + private final int secondaryTimeoutInMs = 500; + private final int secondaryTimeoutCorrectionDeltaInMs = 100; @Before public void before() { @@ -61,7 +67,7 @@ public void primaryReadTest() throws Exception { ByteBuffer content = ByteBuffer.wrap(fillRandomBytes(new byte[1024])); ByteBufferReadableStreamChannel sourceReadableStreamChannel = new ByteBufferReadableStreamChannel(content); PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(sourceReadableStreamChannel, false); + new PipedAsyncWritableChannel(sourceReadableStreamChannel, false, secondaryTimeoutInMs, new MetricRegistry()); ReadableStreamChannel pipedPrimaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); assertNotNull("Primary readable stream channel must not be null", pipedPrimaryReadableStreamChannel); @@ -78,7 +84,7 @@ public void primaryAndSecondaryReadTest() { ByteBuffer content = ByteBuffer.wrap(fillRandomBytes(new byte[1024])); ByteBufferReadableStreamChannel sourceReadableStreamChannel = new ByteBufferReadableStreamChannel(content); PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(sourceReadableStreamChannel, true); + new PipedAsyncWritableChannel(sourceReadableStreamChannel, true, secondaryTimeoutInMs, new MetricRegistry()); ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); ReadableStreamChannel secondaryReadableStreamChannel = pipedAsyncWritableChannel.getSecondaryReadableStreamChannel(); @@ -110,7 +116,8 @@ public void primaryReadByteBufTest() throws Exception { byteBuf.retain(); // retain before it goes to readable stream channel try { PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(new ByteBufReadableStreamChannel(byteBuf), false); + new PipedAsyncWritableChannel(new ByteBufReadableStreamChannel(byteBuf), false, secondaryTimeoutInMs, + new MetricRegistry()); ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); ByteBufferAsyncWritableChannel writableChannel = new ByteBufferAsyncWritableChannel(); primaryReadableStreamChannel.readInto(writableChannel, null); @@ -135,7 +142,8 @@ public void primaryAndSecondaryReadByteBufTest() throws Exception { byteBuf.retain(); // retain before it goes to readable stream channel try { PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(new ByteBufReadableStreamChannel(byteBuf), true); + new PipedAsyncWritableChannel(new ByteBufReadableStreamChannel(byteBuf), true, secondaryTimeoutInMs, + new MetricRegistry()); // Verify we are able to read contents from both primary and secondary readable channels ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -144,15 +152,27 @@ public void primaryAndSecondaryReadByteBufTest() throws Exception { ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); ByteBufferAsyncWritableChannel writableChannel = new ByteBufferAsyncWritableChannel(); - primaryReadableStreamChannel.readInto(writableChannel, null); + CountDownLatch latch = new CountDownLatch(1); + Callback contentReadCallback = new Callback() { + @Override + public void onCompletion(Long result, Exception exception) { + // 1. Verify primary succeeded with no exception + assertNull("No exception expected", exception); + assertEquals(1024, result.longValue()); + latch.countDown(); + } + }; + primaryReadableStreamChannel.readInto(writableChannel, contentReadCallback); ByteBuf obtained = writableChannel.getNextByteBuf(); // Make sure this is the same as original byte array obtained.retain(); - writableChannel.resolveOldestChunk(null); for (int i = 0; i < 1024; i++) { - assertEquals(byteBuf.getByte(i), obtained.getByte(i)); + assertEquals(byteBuf.getByte(i), obtained.readByte()); } + writableChannel.resolveOldestChunk(null); obtained.release(); + Thread.sleep(secondaryTimeoutInMs); + TestUtils.awaitLatchOrTimeout(latch, secondaryTimeoutCorrectionDeltaInMs); } catch (Exception e) { throw new RuntimeException(e); } @@ -160,31 +180,125 @@ public void primaryAndSecondaryReadByteBufTest() throws Exception { Future secondaryReadFuture = executorService.submit(() -> { try { - ReadableStreamChannel primaryReadableStreamChannel = + ReadableStreamChannel secondaryReadableStreamChannel = pipedAsyncWritableChannel.getSecondaryReadableStreamChannel(); ByteBufferAsyncWritableChannel writableChannel = new ByteBufferAsyncWritableChannel(); - primaryReadableStreamChannel.readInto(writableChannel, null); + CountDownLatch latch = new CountDownLatch(1); + Callback contentReadCallback = new Callback() { + @Override + public void onCompletion(Long result, Exception exception) { + // 1. Verify secondary succeeded with no exception + assertNull("No exception expected", exception); + assertEquals(1024, result.longValue()); + latch.countDown(); + } + }; + secondaryReadableStreamChannel.readInto(writableChannel, contentReadCallback); ByteBuf obtained = writableChannel.getNextByteBuf(); // Make sure this is the same as original byte array obtained.retain(); - writableChannel.resolveOldestChunk(null); for (int i = 0; i < 1024; i++) { - assertEquals(byteBuf.getByte(i), obtained.getByte(i)); + assertEquals(byteBuf.getByte(i), obtained.readByte()); } + writableChannel.resolveOldestChunk(null); obtained.release(); + Thread.sleep(secondaryTimeoutInMs); + TestUtils.awaitLatchOrTimeout(latch, secondaryTimeoutCorrectionDeltaInMs); } catch (Exception e) { throw new RuntimeException(e); } }); - primaryReadFuture.get(500, TimeUnit.MILLISECONDS); - secondaryReadFuture.get(500, TimeUnit.MILLISECONDS); + primaryReadFuture.get(secondaryTimeoutInMs + 250, TimeUnit.MILLISECONDS); + secondaryReadFuture.get(secondaryTimeoutInMs + 250, TimeUnit.MILLISECONDS); } finally { byteBuf.release(); assertEquals("Reference count of the original byte buf must be back to 0", 0, byteBuf.refCnt()); } } + @Test + public void secondaryTimeOutTest() throws ExecutionException, InterruptedException, TimeoutException { + + final int contentLength = 1024; + ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(contentLength); + byteBuf.writeBytes(fillRandomBytes(new byte[contentLength])); + byteBuf.retain(); // retain before it goes to readable stream channel + try { + PipedAsyncWritableChannel pipedAsyncWritableChannel = + new PipedAsyncWritableChannel(new ByteBufReadableStreamChannel(byteBuf), true, secondaryTimeoutInMs, + new MetricRegistry()); + + // Read contents from primary + ExecutorService executorService = Executors.newFixedThreadPool(2); + Future primaryReadFuture = executorService.submit(() -> { + try { + ReadableStreamChannel primaryReadableStreamChannel = + pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); + ByteBufferAsyncWritableChannel writableChannel = new ByteBufferAsyncWritableChannel(); + CountDownLatch latch = new CountDownLatch(1); + Callback contentReadCallback = new Callback() { + @Override + public void onCompletion(Long result, Exception exception) { + // 1. Verify primary succeeded with no exception + assertNull("No exception expected", exception); + assertEquals(contentLength, result.longValue()); + latch.countDown(); + } + }; + primaryReadableStreamChannel.readInto(writableChannel, contentReadCallback); + ByteBuf obtained = writableChannel.getNextByteBuf(); + // Make sure this is the same as original byte array + obtained.retain(); + for (int i = 0; i < contentLength; i++) { + assertEquals(byteBuf.getByte(i), obtained.readByte()); + } + writableChannel.resolveOldestChunk(null); + obtained.release(); + Thread.sleep(secondaryTimeoutInMs); + TestUtils.awaitLatchOrTimeout(latch, secondaryTimeoutCorrectionDeltaInMs); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Future secondaryReadFuture = executorService.submit(() -> { + try { + CountDownLatch latch = new CountDownLatch(1); + ReadableStreamChannel secondaryReadableStreamChannel = + pipedAsyncWritableChannel.getSecondaryReadableStreamChannel(); + ByteBufferAsyncWritableChannel writableChannel = new ByteBufferAsyncWritableChannel(); + Callback contentReadCallback = new Callback() { + @Override + public void onCompletion(Long result, Exception exception) { + // 2. Verify ClosedChannelException is called since this is a timeout case + assertNotNull("Exception must be sent", exception); + assertTrue("Expected closed channel exception", exception instanceof ClosedChannelException); + latch.countDown(); + } + }; + secondaryReadableStreamChannel.readInto(writableChannel, contentReadCallback); + + // Don't call resolveOldestChunk() on writableChannel + Thread.sleep(secondaryTimeoutInMs); + // Secondary should have been timed out and content read callback should have been invoked with exception + TestUtils.awaitLatchOrTimeout(latch, secondaryTimeoutCorrectionDeltaInMs); + // 3. Verify metrics + PipedAsyncWritableChannel.Metrics metrics = pipedAsyncWritableChannel.getMetrics(); + assertEquals("Expected secondary timeout count to be 1", 1, metrics.secondaryTimeOutCount.getCount()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + primaryReadFuture.get(secondaryTimeoutInMs + 250, TimeUnit.MILLISECONDS); + secondaryReadFuture.get(secondaryTimeoutInMs + 250, TimeUnit.MILLISECONDS); + } finally { + byteBuf.release(); + assertEquals("Reference count of the original byte buf mismatch", 0, byteBuf.refCnt()); + } + } + /** * Verify that secondary is closed with ClosedChannelException if primary fails */ @@ -193,7 +307,7 @@ public void primaryFailureTest() { ByteBuffer content = ByteBuffer.wrap(fillRandomBytes(new byte[1024])); ByteBufferReadableStreamChannel sourceReadableStreamChannel = new ByteBufferReadableStreamChannel(content); PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(sourceReadableStreamChannel, true); + new PipedAsyncWritableChannel(sourceReadableStreamChannel, true, secondaryTimeoutInMs, new MetricRegistry()); ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); ReadableStreamChannel secondaryReadableStreamChannel = pipedAsyncWritableChannel.getSecondaryReadableStreamChannel(); @@ -231,7 +345,7 @@ public void secondaryFailureTest() { ByteBuffer content = ByteBuffer.wrap(fillRandomBytes(new byte[1024])); ByteBufferReadableStreamChannel sourceReadableStreamChannel = new ByteBufferReadableStreamChannel(content); PipedAsyncWritableChannel pipedAsyncWritableChannel = - new PipedAsyncWritableChannel(sourceReadableStreamChannel, true); + new PipedAsyncWritableChannel(sourceReadableStreamChannel, true, secondaryTimeoutInMs, new MetricRegistry()); ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); ReadableStreamChannel secondaryReadableStreamChannel = pipedAsyncWritableChannel.getSecondaryReadableStreamChannel(); @@ -267,7 +381,8 @@ public void readIntoFailureTest() throws Exception { String errMsg = "@@ExpectedExceptionMessage@@"; byte[] in = fillRandomBytes(new byte[1]); ByteBufferReadableStreamChannel readableStreamChannel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(in)); - PipedAsyncWritableChannel pipedAsyncWritableChannel = new PipedAsyncWritableChannel(readableStreamChannel, false); + PipedAsyncWritableChannel pipedAsyncWritableChannel = + new PipedAsyncWritableChannel(readableStreamChannel, false, secondaryTimeoutInMs, new MetricRegistry()); ReadableStreamChannel primaryReadableStreamChannel = pipedAsyncWritableChannel.getPrimaryReadableStreamChannel(); // 1. Bad Async writable channel. diff --git a/ambry-utils/src/main/java/com/github/ambry/utils/Utils.java b/ambry-utils/src/main/java/com/github/ambry/utils/Utils.java index 6356ecd0eb..5fd8e5ab45 100644 --- a/ambry-utils/src/main/java/com/github/ambry/utils/Utils.java +++ b/ambry-utils/src/main/java/com/github/ambry/utils/Utils.java @@ -1444,7 +1444,7 @@ public static void closeQuietly(AutoCloseable resource) { * A thread factory to use for {@link ScheduledExecutorService}s instantiated using * {@link #newScheduler(int, String, boolean)}. */ - private static class SchedulerThreadFactory implements ThreadFactory { + public static class SchedulerThreadFactory implements ThreadFactory { private final AtomicInteger schedulerThreadId = new AtomicInteger(0); private final String threadNamePrefix; private final boolean isDaemon; @@ -1454,7 +1454,7 @@ private static class SchedulerThreadFactory implements ThreadFactory { * @param threadNamePrefix the prefix string for threads in this scheduler's thread pool. * @param isDaemon {@code true} if the created threads should be daemon threads. */ - SchedulerThreadFactory(String threadNamePrefix, boolean isDaemon) { + public SchedulerThreadFactory(String threadNamePrefix, boolean isDaemon) { this.threadNamePrefix = threadNamePrefix; this.isDaemon = isDaemon; }