Skip to content

Commit

Permalink
[server] Fail-Fast on Corrupted OffsetRecord (#1150)
Browse files Browse the repository at this point in the history
Bug fix for `reportError()` not being called when `PartitionConsumptionState` is `null` due to a corrupted `OffsetRecord`. ⚾️
  • Loading branch information
KaiSernLim authored Sep 6, 2024
1 parent a0d5909 commit a7bcbd5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED = MINUTES.toMillis(1); // 1 min

private static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5;
static final int MAX_CONSUMER_ACTION_ATTEMPTS = 5;
private static final int CONSUMER_ACTION_QUEUE_INIT_CAPACITY = 11;
protected static final long KILL_WAIT_TIME_MS = 5000L;
private static final int MAX_KILL_CHECKING_ATTEMPTS = 10;
Expand Down Expand Up @@ -1626,7 +1626,7 @@ protected void closeVeniceViewWriters() {
/**
* Consumes the kafka actions messages in the queue.
*/
private void processConsumerActions(Store store) throws InterruptedException {
void processConsumerActions(Store store) throws InterruptedException {
Instant startTime = Instant.now();
for (;;) {
// Do not want to remove a message from the queue unless it has been processed.
Expand Down Expand Up @@ -1674,7 +1674,11 @@ private void processConsumerActions(Store store) throws InterruptedException {
if (consumerActionsQueue.remove(action)) {
partitionToPendingConsumerActionCountMap.get(action.getPartition()).decrementAndGet();
}
if (state != null && !state.isCompletionReported()) {
/**
* {@link state} can be null if the {@link OffsetRecord} from {@link storageMetadataService} was corrupted in
* {@link #processCommonConsumerAction}, so the {@link PartitionConsumptionState} was never created
*/
if (state == null || !state.isCompletionReported()) {
reportError(
"Error when processing consumer action: " + action,
action.getPartition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4854,6 +4854,33 @@ public void testMeasureLagWithCallToPubSub() {
"If the partition has messages in it, and we consumed some of them, we expect lag to equal the unconsumed message count.");
}

/**
* When SIT encounters a corrupted {@link OffsetRecord} in {@link StoreIngestionTask#processCommonConsumerAction} and
* {@link StorageMetadataService#getLastOffset} throws an exception due to a deserialization error,
* {@link StoreIngestionTask#reportError(String, int, Exception)} should be called in order to trigger a Helix
* state transition without waiting 24+ hours for the Helix state transition timeout.
*/
@Test
public void testProcessConsumerActionsError() throws Exception {
runTest(Collections.singleton(PARTITION_FOO), () -> {
// This is an actual exception thrown when deserializing a corrupted OffsetRecord
String msg = "Received Magic Byte '6' which is not supported by InternalAvroSpecificSerializer. "
+ "The only supported Magic Byte for this implementation is '24'.";
doThrow(new VeniceMessageException(msg)).when(mockStorageMetadataService).getLastOffset(any(), anyInt());

for (int i = 0; i < StoreIngestionTask.MAX_CONSUMER_ACTION_ATTEMPTS; i++) {
try {
storeIngestionTaskUnderTest.processConsumerActions(storeAndVersionConfigsUnderTest.store);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
ArgumentCaptor<VeniceException> captor = ArgumentCaptor.forClass(VeniceException.class);
verify(storeIngestionTaskUnderTest, atLeastOnce()).reportError(anyString(), eq(PARTITION_FOO), captor.capture());
assertTrue(captor.getValue().getMessage().endsWith(msg));
}, AA_OFF);
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down

0 comments on commit a7bcbd5

Please sign in to comment.