Skip to content

Commit

Permalink
Detect expired messages in Kafka. Log and set a gauge. (#12608)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrajat authored Mar 9, 2024
1 parent c2d7846 commit b11d53c
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),
LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false);
LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
STREAM_DATA_LOSS("streamDataLoss", false);

private final String _meterName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,11 @@ protected boolean consumeLoop()
throw t;
}

StreamPartitionMsgOffset batchFirstOffset = messageBatch.getFirstMessageOffset();
if (batchFirstOffset != null) {
validateStartOffset(_currentOffset, batchFirstOffset);
}

boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);

if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
Expand Down Expand Up @@ -900,6 +905,24 @@ public Map<String, PartitionLagState> getPartitionToLagState(
return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
}

/**
* Checks if the begin offset of the stream partition has been fast-forwarded.
* batchFirstOffset should be less than or equal to startOffset.
* If batchFirstOffset is greater, then some messages were not received.
*
* @param startOffset The offset of the first message desired, inclusive.
* @param batchFirstOffset The offset of the first message in the batch.
*/
private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) {
if (batchFirstOffset.compareTo(startOffset) > 0) {
_serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
String message =
"startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")";
_segmentLogger.error(message);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), message, null));
}
}

public StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@
public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> {
private final List<StreamMessage<byte[]>> _messageList;
private final int _unfilteredMessageCount;
private final long _firstOffset;
private final long _lastOffset;
private final StreamMessageMetadata _lastMessageMetadata;

/**
* @param unfilteredMessageCount how many messages were received from the topic before being filtered
* @param firstOffset the offset of the first message in the batch
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see unfilteredMessageCount}
* @param lastMessageMetadata metadata for last filtered message in the batch, useful for estimating ingestion delay
* when a batch has all messages filtered.
*/
public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage<byte[]>> batch,
StreamMessageMetadata lastMessageMetadata) {
public KafkaMessageBatch(int unfilteredMessageCount, long firstOffset, long lastOffset,
List<StreamMessage<byte[]>> batch, StreamMessageMetadata lastMessageMetadata) {
_messageList = batch;
_firstOffset = firstOffset;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
_lastMessageMetadata = lastMessageMetadata;
Expand Down Expand Up @@ -111,4 +114,9 @@ public byte[] getMessageBytesAtIndex(int index) {
public StreamMessage getStreamMessage(int index) {
return _messageList.get(index);
}

@Override
public StreamPartitionMsgOffset getFirstMessageOffset() {
return new LongMsgOffset(_firstOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long start
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
List<StreamMessage<byte[]>> filtered = new ArrayList<>(messageAndOffsets.size());
long firstOffset = startOffset;
long lastOffset = startOffset;
StreamMessageMetadata rowMetadata = null;
if (!messageAndOffsets.isEmpty()) {
firstOffset = messageAndOffsets.get(0).offset();
}
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
long offset = messageAndOffset.offset();
_lastFetchedOffset = offset;
Expand All @@ -90,6 +94,6 @@ public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long start
endOffset);
}
}
return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered, rowMetadata);
return new KafkaMessageBatch(messageAndOffsets.size(), firstOffset, lastOffset, filtered, rowMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ default boolean isEndOfPartitionGroup() {
return false;
}

/**
* Return the offset of the first message in the batch.
* The first offset of the batch is useful to determine if there were gaps in the stream.
*
* @return null by default
*/
@Nullable
default public StreamPartitionMsgOffset getFirstMessageOffset() {
return null;
}

/**
* This is useful while determining ingestion delay for a message batch. Retaining metadata for last filtered message
* in a batch can enable us to estimate the ingestion delay for the batch.
Expand Down

0 comments on commit b11d53c

Please sign in to comment.