diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java index 8e33f87a04..b9ee11d58e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java @@ -407,4 +407,9 @@ private Map getDedupedDebugInfo(Map extends AbstractVeniceWriter { */ private final Segment[] segments; /** - * Map of partition to its segment creation time in milliseconds. + * Map of partition to its segment start time in milliseconds. * -1: the current segment is ended */ - private final long[] segmentsCreationTimeArray; + private final long[] segmentsStartTimeArray; private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); private final ChunkedValueManifestSerializer chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true); @@ -361,13 +361,13 @@ public VeniceWriter( if (this.numberOfPartitions <= 0) { throw new VeniceException("Invalid number of partitions: " + this.numberOfPartitions); } - this.segmentsCreationTimeArray = new long[this.numberOfPartitions]; + this.segmentsStartTimeArray = new long[this.numberOfPartitions]; // Prepare locks for all partitions instead of using map to avoid the searching and creation cost during // ingestion. this.partitionLocks = new Object[this.numberOfPartitions]; for (int i = 0; i < numberOfPartitions; i++) { partitionLocks[i] = new Object(); - segmentsCreationTimeArray[i] = -1L; + segmentsStartTimeArray[i] = -1L; } this.segments = new Segment[this.numberOfPartitions]; OPEN_VENICE_WRITER_COUNT.incrementAndGet(); @@ -1935,7 +1935,7 @@ private int getPartition(byte[] key) { * @return the existing {@link Segment} associated with the requested partition, or * a new one if none existed previously. */ - private Segment getSegment(int partition, boolean sendEndOfSegment) { + Segment getSegment(int partition, boolean sendEndOfSegment) { synchronized (this.partitionLocks[partition]) { Segment currentSegment = segments[partition]; if (currentSegment == null || currentSegment.isEnded()) { @@ -1945,9 +1945,9 @@ private Segment getSegment(int partition, boolean sendEndOfSegment) { // timed out. The segment won't be closed if the ongoing message itself is // an "end_of_segment" message. if (!sendEndOfSegment) { - long currentSegmentCreationTime = segmentsCreationTimeArray[partition]; - if (currentSegmentCreationTime != -1 - && LatencyUtils.getElapsedTimeInMs(currentSegmentCreationTime) > maxElapsedTimeForSegmentInMs) { + long currentSegmentStartTime = segmentsStartTimeArray[partition]; + if (currentSegmentStartTime != -1 + && LatencyUtils.getElapsedTimeInMs(currentSegmentStartTime) > maxElapsedTimeForSegmentInMs) { endSegment(partition, false); currentSegment = startSegment(partition); } @@ -1972,20 +1972,18 @@ private Segment getSegment(int partition, boolean sendEndOfSegment) { private Segment startSegment(int partition) { synchronized (this.partitionLocks[partition]) { Segment currentSegment = segments[partition]; - long segmentStartTime = segmentsCreationTimeArray[partition]; if (currentSegment == null) { currentSegment = new Segment(partition, 0, checkSumType); - segmentStartTime = time.getMilliseconds(); } else if (currentSegment.isEnded()) { int newSegmentNumber = currentSegment.getSegmentNumber() + 1; currentSegment = new Segment(partition, newSegmentNumber, checkSumType); - segmentStartTime = time.getMilliseconds(); } if (!currentSegment.isStarted()) { segments[partition] = currentSegment; - segmentsCreationTimeArray[partition] = segmentStartTime; + // Record the new start time of the segment. + segmentsStartTimeArray[partition] = time.getMilliseconds(); sendStartOfSegment(partition, null); currentSegment.start(); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index 6d99884dc8..067ac7aa59 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -20,6 +20,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; import com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask; import com.linkedin.davinci.kafka.consumer.LeaderProducerCallback; @@ -32,6 +34,7 @@ import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.kafka.validation.Segment; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.api.PubSubMessage; @@ -560,4 +563,42 @@ public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionEx // Verify that the close(false) method will be called twice. verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false)); } + + /** + * This is a regression test for the VeniceWriter issue where the VeniceWriter could run into + * infinite recursions and eventually run out of the stack space and throw StackOverflowError. + * + * The conditions to trigger this issue are: + * 1. The VeniceWriter's cached segment is neither started nor ended. + * 2. The elapsed time for the segment is greater than MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS. + */ + @Test(timeOut = 10 * Time.MS_PER_SECOND) + public void testVeniceWriterShouldNotCauseStackOverflowError() { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + CompletableFuture mockedFuture = mock(CompletableFuture.class); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture); + + Properties writerProperties = new Properties(); + writerProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 1); + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder("test").setPartitionCount(1).build(); + + try (VeniceWriter writer = + new VeniceWriter<>(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer)) { + Segment seg = writer.getSegment(0, false); + seg.setStarted(false); + + // Verify that segment is neither started nor ended. + assertFalse(seg.isStarted()); + assertFalse(seg.isEnded()); + + // Sleep for 0.1 second to make sure the elapsed time for the segment is greater than + // MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS. + Thread.sleep(100); + + // Send an SOS control message to the topic and it should not cause StackOverflowError. + writer.sendStartOfSegment(0, null); + } catch (Throwable t) { + fail("VeniceWriter.close() should not cause StackOverflowError", t); + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java index 90cccaeffe..c53e11a0e3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java @@ -2,6 +2,8 @@ import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE; import static com.linkedin.venice.utils.Time.MS_PER_SECOND; +import static com.linkedin.venice.writer.VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -13,6 +15,7 @@ import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.ProducerMetadata; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.kafka.validation.Segment; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; @@ -227,4 +230,51 @@ public void testVeniceWriterClose(boolean doFlush) { } } } + + /** + * This is a regression test for the VeniceWriter issue where the VeniceWriter could run into + * infinite recursions, eventually run out of the stack space and throw StackOverflowError. + * + * The conditions to trigger this issue are: + * 1. The VeniceWriter's cached segment is neither started nor ended. + * 2. The elapsed time for the segment is greater than MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS. + */ + @Test(timeOut = 30 * MS_PER_SECOND) + public void testVeniceWriterShouldNotCauseStackOverflow() { + String topicName = TestUtils.getUniqueTopicString("topic-for-vw-stack-overflow"); + int partitionCount = 1; + PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName); + + topicManager.createTopic(pubSubTopic, partitionCount, 1, true); + Properties properties = new Properties(); + properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress()); + properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName()); + + // Explicitly set MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS to 1 second. + properties.put(MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 1000); + properties.putAll(PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper))); + + try (VeniceWriter veniceWriter = + TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory) + .createVeniceWriter( + new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true) + .setPartitionCount(partitionCount) + .build())) { + Segment seg = veniceWriter.getSegment(0, false); + seg.setStarted(false); + + // Verify that segment is neither started nor ended. + assertFalse(seg.isStarted()); + assertFalse(seg.isEnded()); + + // Sleep for 1.1 seconds to make sure the elapsed time for the segment is greater than + // MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS. + Thread.sleep(1100); + + // Send an SOS control message to the topic and it should not cause StackOverflowError. + veniceWriter.sendStartOfSegment(0, null); + } catch (Throwable t) { + Assert.fail("VeniceWriter should not cause stack overflow", t); + } + } }