From b11d53cbc5966f1daaf7c6b025cbdc0c53446f61 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Fri, 8 Mar 2024 20:29:15 -0800 Subject: [PATCH] Detect expired messages in Kafka. Log and set a gauge. (#12608) --- .../pinot/common/metrics/ServerMeter.java | 3 ++- .../realtime/RealtimeSegmentDataManager.java | 23 +++++++++++++++++++ .../stream/kafka20/KafkaMessageBatch.java | 12 ++++++++-- .../kafka20/KafkaPartitionLevelConsumer.java | 6 ++++- .../apache/pinot/spi/stream/MessageBatch.java | 11 +++++++++ 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index c516740070f..e8819ca9459 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -112,7 +112,8 @@ public enum ServerMeter implements AbstractMetrics.Meter { NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false), LARGE_QUERY_RESPONSES_SENT("largeResponses", false), TOTAL_THREAD_CPU_TIME_MILLIS("millis", false), - LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false); + LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false), + STREAM_DATA_LOSS("streamDataLoss", false); private final String _meterName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 2a5da62c2a1..49cddb55749 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -456,6 +456,11 @@ protected boolean consumeLoop() throw t; } + StreamPartitionMsgOffset batchFirstOffset = messageBatch.getFirstMessageOffset(); + if (batchFirstOffset != null) { + validateStartOffset(_currentOffset, batchFirstOffset); + } + boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis); if (_currentOffset.compareTo(lastUpdatedOffset) != 0) { @@ -900,6 +905,24 @@ public Map getPartitionToLagState( return _partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap); } + /** + * Checks if the begin offset of the stream partition has been fast-forwarded. + * batchFirstOffset should be less than or equal to startOffset. + * If batchFirstOffset is greater, then some messages were not received. + * + * @param startOffset The offset of the first message desired, inclusive. + * @param batchFirstOffset The offset of the first message in the batch. + */ + private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) { + if (batchFirstOffset.compareTo(startOffset) > 0) { + _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); + String message = + "startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")"; + _segmentLogger.error(message); + _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), message, null)); + } + } + public StreamPartitionMsgOffset getCurrentOffset() { return _currentOffset; } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index dbc3e8d2a69..005b4c27b3a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -31,19 +31,22 @@ public class KafkaMessageBatch implements MessageBatch> { private final List> _messageList; private final int _unfilteredMessageCount; + private final long _firstOffset; private final long _lastOffset; private final StreamMessageMetadata _lastMessageMetadata; /** * @param unfilteredMessageCount how many messages were received from the topic before being filtered + * @param firstOffset the offset of the first message in the batch * @param lastOffset the offset of the last message in the batch * @param batch the messages, which may be smaller than {@see unfilteredMessageCount} * @param lastMessageMetadata metadata for last filtered message in the batch, useful for estimating ingestion delay * when a batch has all messages filtered. */ - public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List> batch, - StreamMessageMetadata lastMessageMetadata) { + public KafkaMessageBatch(int unfilteredMessageCount, long firstOffset, long lastOffset, + List> batch, StreamMessageMetadata lastMessageMetadata) { _messageList = batch; + _firstOffset = firstOffset; _lastOffset = lastOffset; _unfilteredMessageCount = unfilteredMessageCount; _lastMessageMetadata = lastMessageMetadata; @@ -111,4 +114,9 @@ public byte[] getMessageBytesAtIndex(int index) { public StreamMessage getStreamMessage(int index) { return _messageList.get(index); } + + @Override + public StreamPartitionMsgOffset getFirstMessageOffset() { + return new LongMsgOffset(_firstOffset); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index ff90f4b1a3f..df51d2fda9a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -69,8 +69,12 @@ public synchronized MessageBatch> fetchMessages(long start ConsumerRecords consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List> messageAndOffsets = consumerRecords.records(_topicPartition); List> filtered = new ArrayList<>(messageAndOffsets.size()); + long firstOffset = startOffset; long lastOffset = startOffset; StreamMessageMetadata rowMetadata = null; + if (!messageAndOffsets.isEmpty()) { + firstOffset = messageAndOffsets.get(0).offset(); + } for (ConsumerRecord messageAndOffset : messageAndOffsets) { long offset = messageAndOffset.offset(); _lastFetchedOffset = offset; @@ -90,6 +94,6 @@ public synchronized MessageBatch> fetchMessages(long start endOffset); } } - return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered, rowMetadata); + return new KafkaMessageBatch(messageAndOffsets.size(), firstOffset, lastOffset, filtered, rowMetadata); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 8bde04aed44..9a8f4e15fc0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -118,6 +118,17 @@ default boolean isEndOfPartitionGroup() { return false; } + /** + * Return the offset of the first message in the batch. + * The first offset of the batch is useful to determine if there were gaps in the stream. + * + * @return null by default + */ + @Nullable + default public StreamPartitionMsgOffset getFirstMessageOffset() { + return null; + } + /** * This is useful while determining ingestion delay for a message batch. Retaining metadata for last filtered message * in a batch can enable us to estimate the ingestion delay for the batch.