Skip to content

Commit

Permalink
[all] Fix an infinite recursion issue in VeniceWriter (#884)
Browse files Browse the repository at this point in the history
Today, VeniceWriter (VW) has a bug to run into an infinite recursion when it sends
a message in the VeniceWriter and eventually it throws StackOverflowError.
For a particular partition, the conditions to trigger the issue are:

1. VW has a cached Segment
2. The segment is neither started nor ended
3. The elapsed time for the segment is greater than MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS

To fix it, we break the infinite loop by always recording the new start time. A new integration test, as well as a unit test, is added to capture any regressions.
  • Loading branch information
lluwm authored Mar 1, 2024
1 parent 88f21a2 commit 602e6a2
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,9 @@ private Map<CharSequence, CharSequence> getDedupedDebugInfo(Map<CharSequence, Ch
}
return deduped;
}

// Only for testing.
public void setStarted(boolean started) {
this.started = started;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ public class VeniceWriter<K, V, U> extends AbstractVeniceWriter<K, V, U> {
*/
private final Segment[] segments;
/**
* Map of partition to its segment creation time in milliseconds.
* Map of partition to its segment start time in milliseconds.
* -1: the current segment is ended
*/
private final long[] segmentsCreationTimeArray;
private final long[] segmentsStartTimeArray;
private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
private final ChunkedValueManifestSerializer chunkedValueManifestSerializer =
new ChunkedValueManifestSerializer(true);
Expand Down Expand Up @@ -361,13 +361,13 @@ public VeniceWriter(
if (this.numberOfPartitions <= 0) {
throw new VeniceException("Invalid number of partitions: " + this.numberOfPartitions);
}
this.segmentsCreationTimeArray = new long[this.numberOfPartitions];
this.segmentsStartTimeArray = new long[this.numberOfPartitions];
// Prepare locks for all partitions instead of using map to avoid the searching and creation cost during
// ingestion.
this.partitionLocks = new Object[this.numberOfPartitions];
for (int i = 0; i < numberOfPartitions; i++) {
partitionLocks[i] = new Object();
segmentsCreationTimeArray[i] = -1L;
segmentsStartTimeArray[i] = -1L;
}
this.segments = new Segment[this.numberOfPartitions];
OPEN_VENICE_WRITER_COUNT.incrementAndGet();
Expand Down Expand Up @@ -1935,7 +1935,7 @@ private int getPartition(byte[] key) {
* @return the existing {@link Segment} associated with the requested partition, or
* a new one if none existed previously.
*/
private Segment getSegment(int partition, boolean sendEndOfSegment) {
Segment getSegment(int partition, boolean sendEndOfSegment) {
synchronized (this.partitionLocks[partition]) {
Segment currentSegment = segments[partition];
if (currentSegment == null || currentSegment.isEnded()) {
Expand All @@ -1945,9 +1945,9 @@ private Segment getSegment(int partition, boolean sendEndOfSegment) {
// timed out. The segment won't be closed if the ongoing message itself is
// an "end_of_segment" message.
if (!sendEndOfSegment) {
long currentSegmentCreationTime = segmentsCreationTimeArray[partition];
if (currentSegmentCreationTime != -1
&& LatencyUtils.getElapsedTimeInMs(currentSegmentCreationTime) > maxElapsedTimeForSegmentInMs) {
long currentSegmentStartTime = segmentsStartTimeArray[partition];
if (currentSegmentStartTime != -1
&& LatencyUtils.getElapsedTimeInMs(currentSegmentStartTime) > maxElapsedTimeForSegmentInMs) {
endSegment(partition, false);
currentSegment = startSegment(partition);
}
Expand All @@ -1972,20 +1972,18 @@ private Segment getSegment(int partition, boolean sendEndOfSegment) {
private Segment startSegment(int partition) {
synchronized (this.partitionLocks[partition]) {
Segment currentSegment = segments[partition];
long segmentStartTime = segmentsCreationTimeArray[partition];

if (currentSegment == null) {
currentSegment = new Segment(partition, 0, checkSumType);
segmentStartTime = time.getMilliseconds();
} else if (currentSegment.isEnded()) {
int newSegmentNumber = currentSegment.getSegmentNumber() + 1;
currentSegment = new Segment(partition, newSegmentNumber, checkSumType);
segmentStartTime = time.getMilliseconds();
}

if (!currentSegment.isStarted()) {
segments[partition] = currentSegment;
segmentsCreationTimeArray[partition] = segmentStartTime;
// Record the new start time of the segment.
segmentsStartTimeArray[partition] = time.getMilliseconds();
sendStartOfSegment(partition, null);
currentSegment.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;

import com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask;
import com.linkedin.davinci.kafka.consumer.LeaderProducerCallback;
Expand All @@ -32,6 +34,7 @@
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.Segment;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.api.PubSubMessage;
Expand Down Expand Up @@ -560,4 +563,42 @@ public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionEx
// Verify that the close(false) method will be called twice.
verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false));
}

/**
* This is a regression test for the VeniceWriter issue where the VeniceWriter could run into
* infinite recursions and eventually run out of the stack space and throw StackOverflowError.
*
* The conditions to trigger this issue are:
* 1. The VeniceWriter's cached segment is neither started nor ended.
* 2. The elapsed time for the segment is greater than MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS.
*/
@Test(timeOut = 10 * Time.MS_PER_SECOND)
public void testVeniceWriterShouldNotCauseStackOverflowError() {
PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class);
CompletableFuture mockedFuture = mock(CompletableFuture.class);
when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())).thenReturn(mockedFuture);

Properties writerProperties = new Properties();
writerProperties.put(VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 1);
VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder("test").setPartitionCount(1).build();

try (VeniceWriter<Object, Object, Object> writer =
new VeniceWriter<>(veniceWriterOptions, new VeniceProperties(writerProperties), mockedProducer)) {
Segment seg = writer.getSegment(0, false);
seg.setStarted(false);

// Verify that segment is neither started nor ended.
assertFalse(seg.isStarted());
assertFalse(seg.isEnded());

// Sleep for 0.1 second to make sure the elapsed time for the segment is greater than
// MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS.
Thread.sleep(100);

// Send an SOS control message to the topic and it should not cause StackOverflowError.
writer.sendStartOfSegment(0, null);
} catch (Throwable t) {
fail("VeniceWriter.close() should not cause StackOverflowError", t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE;
import static com.linkedin.venice.utils.Time.MS_PER_SECOND;
import static com.linkedin.venice.writer.VeniceWriter.MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -13,6 +15,7 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.Segment;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory;
Expand Down Expand Up @@ -227,4 +230,51 @@ public void testVeniceWriterClose(boolean doFlush) {
}
}
}

/**
* This is a regression test for the VeniceWriter issue where the VeniceWriter could run into
* infinite recursions, eventually run out of the stack space and throw StackOverflowError.
*
* The conditions to trigger this issue are:
* 1. The VeniceWriter's cached segment is neither started nor ended.
* 2. The elapsed time for the segment is greater than MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS.
*/
@Test(timeOut = 30 * MS_PER_SECOND)
public void testVeniceWriterShouldNotCauseStackOverflow() {
String topicName = TestUtils.getUniqueTopicString("topic-for-vw-stack-overflow");
int partitionCount = 1;
PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName);

topicManager.createTopic(pubSubTopic, partitionCount, 1, true);
Properties properties = new Properties();
properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress());
properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName());

// Explicitly set MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS to 1 second.
properties.put(MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, 1000);
properties.putAll(PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper)));

try (VeniceWriter<KafkaKey, byte[], byte[]> veniceWriter =
TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory)
.createVeniceWriter(
new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true)
.setPartitionCount(partitionCount)
.build())) {
Segment seg = veniceWriter.getSegment(0, false);
seg.setStarted(false);

// Verify that segment is neither started nor ended.
assertFalse(seg.isStarted());
assertFalse(seg.isEnded());

// Sleep for 1.1 seconds to make sure the elapsed time for the segment is greater than
// MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS.
Thread.sleep(1100);

// Send an SOS control message to the topic and it should not cause StackOverflowError.
veniceWriter.sendStartOfSegment(0, null);
} catch (Throwable t) {
Assert.fail("VeniceWriter should not cause stack overflow", t);
}
}
}

0 comments on commit 602e6a2

Please sign in to comment.