diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index fd31d8f72b4..d001ef27251 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.data.manager.realtime; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.time.Clock; @@ -37,15 +38,21 @@ import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A Class to track realtime ingestion delay for table partitions on a given server. * Highlights: @@ -83,22 +90,37 @@ * * TODO: handle bug situations like the one where a partition is not allocated to a given server due to a bug. */ - public class IngestionDelayTracker { - private static class IngestionInfo { - final long _ingestionTimeMs; - final long _firstStreamIngestionTimeMs; - final StreamPartitionMsgOffset _currentOffset; - final StreamPartitionMsgOffset _latestOffset; + public static final int MAX_OFFSET_FETCH_WAIT_TIME_MS = 5000; - IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs, - @Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) { + private static class IngestionInfo { + final StreamMetadataProvider _streamMetadataProvider; + volatile Long _ingestionTimeMs; + volatile Long _firstStreamIngestionTimeMs; + volatile StreamPartitionMsgOffset _currentOffset; + volatile StreamPartitionMsgOffset _latestOffset; + + IngestionInfo(Long ingestionTimeMs, Long firstStreamIngestionTimeMs, StreamPartitionMsgOffset currentOffset, + StreamMetadataProvider streamMetadataProvider) { _ingestionTimeMs = ingestionTimeMs; _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; _currentOffset = currentOffset; + _streamMetadataProvider = streamMetadataProvider; + } + + void updateCurrentOffset(StreamPartitionMsgOffset currentOffset) { + _currentOffset = currentOffset; + } + + void updateLatestOffset(StreamPartitionMsgOffset latestOffset) { _latestOffset = latestOffset; } + + void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) { + _ingestionTimeMs = ingestionTimeMs; + _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; + } } private static final Logger LOGGER = LoggerFactory.getLogger(IngestionDelayTracker.class); @@ -112,6 +134,13 @@ private static class IngestionInfo { // Cache expire time for ignored segment if there is no update from the segment. private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10; + public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable"; + public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval"; + + // Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently. + public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true; + public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute + public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L; // Per partition info for all partitions active for the current table. private final Map _ingestionInfoMap = new ConcurrentHashMap<>(); @@ -120,7 +149,7 @@ private static class IngestionInfo { // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons. // TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay} - // instead. + // instead private final Map _partitionsMarkedForVerification = new ConcurrentHashMap<>(); private final Cache _segmentsToIgnore = @@ -139,10 +168,16 @@ private static class IngestionInfo { private Clock _clock; + // Configuration parameters + private final boolean _enableOffsetLagMetric; + private final long _offsetLagUpdateIntervalMs; + + private final StreamConsumerFactory _streamConsumerFactory; + @VisibleForTesting public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs, - Supplier isServerReadyToServeQueries) + Supplier isServerReadyToServeQueries, TableConfig tableConfig) throws RuntimeException { _serverMetrics = serverMetrics; _tableNameWithType = tableNameWithType; @@ -150,6 +185,27 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy _realTimeTableDataManager = realtimeTableDataManager; _clock = Clock.systemUTC(); _isServerReadyToServeQueries = isServerReadyToServeQueries; + + StreamConfig streamConfig = + new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig)); + _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + + if (realtimeTableDataManager.getInstanceDataManagerConfig() != null + && realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) { + PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig(); + _enableOffsetLagMetric = + pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC); + _offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, + DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS); + + Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL, + String.format("Value of Offset lag update interval config: %s must be greater than %d", + OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL)); + } else { + _enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC; + _offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS; + } + // Handle negative timer values if (scheduledExecutorThreadTickIntervalMs <= 0) { throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s", @@ -171,12 +227,36 @@ public Thread newThread(Runnable r) { _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions, INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS); + + if (_enableOffsetLagMetric) { + _scheduledExecutor.scheduleWithFixedDelay(this::updateLatestOffsets, + 0, _offsetLagUpdateIntervalMs, TimeUnit.MILLISECONDS); + } } public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, - RealtimeTableDataManager tableDataManager, Supplier isServerReadyToServeQueries) { + RealtimeTableDataManager tableDataManager, Supplier isServerReadyToServeQueries, + TableConfig tableConfig) { this(serverMetrics, tableNameWithType, tableDataManager, SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS, - isServerReadyToServeQueries); + isServerReadyToServeQueries, tableConfig); + } + + private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs, + StreamMetadataProvider streamMetadataProvider) { + try { + return streamMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, maxWaitTimeMs); + } catch (Exception e) { + LOGGER.debug("Caught exception while fetching stream offset", e); + } + return null; + } + + /** + * Creates a new stream metadata provider + */ + private StreamMetadataProvider createPartitionMetadataProvider(String reason, String clientId, int partitionGroupId) { + LOGGER.info("Creating new partition metadata provider, reason: {}", reason); + return _streamConsumerFactory.createPartitionMetadataProvider(clientId, partitionGroupId); } /* @@ -210,6 +290,14 @@ private void removePartitionId(int partitionId) { _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); + + if (v._streamMetadataProvider != null) { + try { + v._streamMetadataProvider.close(); + } catch (Exception e) { + LOGGER.warn("Caught exception while closing stream metadata provider for partitionId: {}", partitionId, e); + } + } } return null; }); @@ -253,18 +341,16 @@ void setClock(Clock clock) { * @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from * {@link RowMetadata}) * @param currentOffset offset of the last consumed message (from {@link RowMetadata}) - * @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider}) */ public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs, - long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, - @Nullable StreamPartitionMsgOffset latestOffset) { + long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) { if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) { // Do not update the ingestion delay metrics during server startup period // or once the table data manager has been shutdown. return; } - if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && (currentOffset == null || latestOffset == null)) { + if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && currentOffset == null) { // Do not publish metrics if stream does not return valid ingestion time or offset. return; } @@ -285,12 +371,28 @@ public void updateIngestionMetrics(String segmentName, int partitionId, long ing ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId)); } - if (currentOffset != null && latestOffset != null) { + if (_enableOffsetLagMetric) { _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionId)); } + + StreamMetadataProvider streamMetadataProvider = createPartitionMetadataProvider("IngestionOffsetLagCalculation", + segmentName + "_consumer_ingestionDelayTracker", partitionId); + IngestionInfo ingestionInfo = + new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, streamMetadataProvider); + + if (streamMetadataProvider != null) { + StreamPartitionMsgOffset latestOffset = + fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 5000, streamMetadataProvider); + ingestionInfo.updateLatestOffset(latestOffset); + } + + return ingestionInfo; + } else { + v.updateIngestionTimes(ingestionTimeMs, firstStreamIngestionTimeMs); + v.updateCurrentOffset(currentOffset); + return v; } - return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset); }); // If we are consuming we do not need to track this partition for removal. @@ -351,6 +453,30 @@ public void timeoutInactivePartitions() { } } + /** + * Updates the latest offsets for each partition at a configurable frequency to reduce load. + */ + private void updateLatestOffsets() { + if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) { + return; + } + for (Map.Entry entry : _ingestionInfoMap.entrySet()) { + int partitionId = entry.getKey(); + IngestionInfo ingestionInfo = entry.getValue(); + StreamMetadataProvider streamMetadataProvider = ingestionInfo._streamMetadataProvider; + if (streamMetadataProvider != null) { + try { + StreamPartitionMsgOffset latestOffset = + fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, MAX_OFFSET_FETCH_WAIT_TIME_MS, + streamMetadataProvider); + ingestionInfo.updateLatestOffset(latestOffset); + } catch (Exception e) { + LOGGER.debug("Failed to fetch latest offset for partition {}", partitionId, e); + } + } + } + } + /** * This function is invoked when a segment goes from CONSUMING to ONLINE, so we can assert whether the partition of * the segment is still hosted by this server after some interval of time. @@ -400,20 +526,35 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionId) { } public long getPartitionIngestionOffsetLag(int partitionId) { - IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); - if (ingestionInfo == null) { - return 0; - } - StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; - StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; - if (currentOffset == null || latestOffset == null) { - return 0; - } - // TODO: Support other types of offsets - if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { + try { + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + if (ingestionInfo == null) { + return 0; + } + StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; + StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; + if (currentOffset == null || latestOffset == null) { + return 0; + } + // TODO: Support other types of offsets + if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { + return 0; + } + long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); + + if (offsetLag < 0) { + LOGGER.debug( + "Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to " + + "latestOffset not being updated", + partitionId, currentOffset, latestOffset); + return 0; + } + + return offsetLag; + } catch (Exception e) { + LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e); return 0; } - return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); } /* diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index ecf5cb12cd8..b493f2cd30e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1842,10 +1842,8 @@ private void createPartitionMetadataProvider(String reason) { private void updateIngestionMetrics(RowMetadata metadata) { if (metadata != null) { try { - StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true); _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, - metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(), - latestOffset); + metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset()); } catch (Exception e) { _segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", e); } @@ -1859,7 +1857,7 @@ private void updateIngestionMetrics(RowMetadata metadata) { private void setIngestionDelayToZero() { long currentTimeMs = System.currentTimeMillis(); _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, currentTimeMs, currentTimeMs, - null, null); + _currentOffset); } // This should be done during commit? We may not always commit when we build a segment.... diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index afc1e445291..c6f6fc0c2b1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -77,7 +77,6 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.RowMetadata; -import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; @@ -143,7 +142,7 @@ protected void doInit() { _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType); // Tracks ingestion delay of all partitions being served for this table _ingestionDelayTracker = - new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries); + new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries, _tableConfig); File statsFile = new File(_tableDataDir, STATS_FILE_NAME); try { _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile); @@ -284,13 +283,11 @@ protected void doShutdown() { * @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from * {@link RowMetadata}) * @param currentOffset offset of the last consumed message (from {@link RowMetadata}) - * @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider}) */ public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs, - long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, - @Nullable StreamPartitionMsgOffset latestOffset) { + long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) { _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs, - currentOffset, latestOffset); + currentOffset); } /** diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 9cb527b121d..81c2fef3738 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -18,19 +18,40 @@ */ package org.apache.pinot.core.data.manager.realtime; +import com.google.common.base.Supplier; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class IngestionDelayTrackerTest { @@ -38,22 +59,85 @@ public class IngestionDelayTrackerTest { private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100; - private final ServerMetrics _serverMetrics = mock(ServerMetrics.class); - private final RealtimeTableDataManager _realtimeTableDataManager = mock(RealtimeTableDataManager.class); + private ServerMetrics _serverMetrics; + private RealtimeTableDataManager _realtimeTableDataManager; + // Mocks for StreamConsumerFactory and StreamMetadataProvider + private StreamConsumerFactory _mockStreamConsumerFactory; + private StreamMetadataProvider _mockStreamMetadataProvider; + + // MockedStatic for StreamConsumerFactoryProvider + private MockedStatic _mockedFactoryProvider; + + @BeforeMethod + public void setUp() throws Exception { + _serverMetrics = mock(ServerMetrics.class); + _realtimeTableDataManager = mock(RealtimeTableDataManager.class); + + // Initialize mocks for StreamConsumerFactory and StreamMetadataProvider + _mockStreamConsumerFactory = mock(StreamConsumerFactory.class); + _mockStreamMetadataProvider = mock(StreamMetadataProvider.class); + + // Mock the static method StreamConsumerFactoryProvider.create() + _mockedFactoryProvider = Mockito.mockStatic(StreamConsumerFactoryProvider.class); + _mockedFactoryProvider.when(() -> StreamConsumerFactoryProvider.create(any())) + .thenReturn(_mockStreamConsumerFactory); + + // When createPartitionMetadataProvider is called, return the mock StreamMetadataProvider + when(_mockStreamConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())) + .thenReturn(_mockStreamMetadataProvider); + + // Default behavior for fetchStreamPartitionOffset + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(100L)); + } + + @AfterMethod + public void tearDown() { + // Close the mocked static after each test + _mockedFactoryProvider.close(); + } + + /** + * Helper method to create a TableConfig with necessary stream configurations. + */ + private TableConfig createTableConfig() { + Map streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", "kafka"); + streamConfigMap.put("stream.kafka.topic.name", RAW_TABLE_NAME); + streamConfigMap.put("stream.kafka.decoder.class.name", + "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"); + + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(Collections.singletonList(streamConfigMap)); + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); + return new TableConfigBuilder(TableType.REALTIME) + .setTableName(RAW_TABLE_NAME) + .setIngestionConfig(ingestionConfig) + .build(); + } + + /** + * Updated createTracker method to pass TableConfig and handle mocks. + */ private IngestionDelayTracker createTracker() { + TableConfig tableConfig = createTableConfig(); + + // Supplier to indicate the server is ready to serve queries + Supplier isServerReadyToServeQueries = () -> true; + + // Create the IngestionDelayTracker with the new constructor IngestionDelayTracker ingestionDelayTracker = - new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, () -> true); - // With no samples, the time reported must be zero - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); + new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, + TIMER_THREAD_TICK_INTERVAL_MS, isServerReadyToServeQueries, tableConfig); + return ingestionDelayTracker; } @Test - public void testTrackerConstructors() { - // Test regular constructor - IngestionDelayTracker ingestionDelayTracker = - new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, () -> true); + public void testTrackerConstructors() throws Exception { + // Test regular constructor with TableConfig + IngestionDelayTracker ingestionDelayTracker = createTracker(); Clock clock = Clock.systemUTC(); ingestionDelayTracker.setClock(clock); @@ -61,24 +145,27 @@ public void testTrackerConstructors() { Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); ingestionDelayTracker.shutdown(); - // Test constructor with timer arguments + + // Test constructor with timer arguments and TableConfig ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, - TIMER_THREAD_TICK_INTERVAL_MS, () -> true); + TIMER_THREAD_TICK_INTERVAL_MS, () -> true, createTableConfig()); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); + // Test bad timer args to the constructor try { - new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, 0, () -> true); + new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, 0, () -> true, + createTableConfig()); Assert.fail("Must have asserted due to invalid arguments"); // Constructor must assert + } catch (RuntimeException e) { + // Expected exception, test passes } catch (Exception e) { - if ((e instanceof NullPointerException) || !(e instanceof RuntimeException)) { - Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(), e.getMessage())); - } + Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(), e.getMessage())); } } @Test - public void testRecordIngestionDelayWithNoAging() { + public void testRecordIngestionDelayWithNoAging() throws Exception { final long maxTestDelay = 100; final int partition0 = 0; final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); @@ -86,60 +173,37 @@ public void testRecordIngestionDelayWithNoAging() { final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 234).getSegmentName(); IngestionDelayTracker ingestionDelayTracker = createTracker(); - // Use fixed clock so samples dont age + // Use fixed clock so samples don't age Instant now = Instant.now(); ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); + Clock fixedClock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(fixedClock); + + // Mock fetchStreamPartitionOffset to return a fixed latest offset for testing + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(200L)); // Example latest offset // Test we follow a single partition up and down for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay; ingestionTimeMs++) { long firstStreamIngestionTimeMs = ingestionTimeMs + 1; ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, firstStreamIngestionTimeMs, - null, null); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), - clock.millis() - ingestionTimeMs); - Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), - clock.millis() - firstStreamIngestionTimeMs); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); - } - - // Test tracking down a measure for a given partition - for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0; ingestionTimeMs--) { - long firstStreamIngestionTimeMs = ingestionTimeMs + 1; - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, firstStreamIngestionTimeMs, - null, null); + new LongMsgOffset(ingestionTimeMs)); // currentOffset as ingestionTimeMs for testing Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), - clock.millis() - ingestionTimeMs); + fixedClock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), - clock.millis() - (ingestionTimeMs + 1)); + fixedClock.millis() - firstStreamIngestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); } - // Make the current partition maximum - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, maxTestDelay, maxTestDelay, null, null); - - // Bring up partition1 delay up and verify values - for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay; ingestionTimeMs++) { - long firstStreamIngestionTimeMs = ingestionTimeMs + 1; - ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, firstStreamIngestionTimeMs, - null, null); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), - clock.millis() - ingestionTimeMs); - Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), - clock.millis() - firstStreamIngestionTimeMs); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); - } - - // Bring down values of partition1 and verify values - for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0; ingestionTimeMs--) { + // Test tracking another partition + for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay; ingestionTimeMs++) { long firstStreamIngestionTimeMs = ingestionTimeMs + 1; ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, firstStreamIngestionTimeMs, - null, null); + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), - clock.millis() - ingestionTimeMs); + fixedClock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), - clock.millis() - firstStreamIngestionTimeMs); + fixedClock.millis() - firstStreamIngestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); } @@ -148,7 +212,7 @@ public void testRecordIngestionDelayWithNoAging() { } @Test - public void testRecordIngestionDelayWithAging() { + public void testRecordIngestionDelayWithAging() throws Exception { final int partition0 = 0; final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final long partition0Delay0 = 1000; @@ -162,28 +226,35 @@ public void testRecordIngestionDelayWithAging() { IngestionDelayTracker ingestionDelayTracker = createTracker(); - // With samples for a single partition, test that sample is aged as expected + // Use fixed clock so samples don't age Instant now = Instant.now(); ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); - long ingestionTimeMs = clock.millis() - partition0Delay0; - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, null, null); + Clock fixedClock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(fixedClock); + + // Mock fetchStreamPartitionOffset to return latest offsets + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(500L)); // Example latest offset + + long ingestionTimeMs = fixedClock.millis() - partition0Delay0; + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); // Advance clock and test aging - Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms)); + Clock offsetClock = Clock.offset(fixedClock, Duration.ofMillis(partition0Offset0Ms)); ingestionDelayTracker.setClock(offsetClock); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), - (partition0Delay0 + partition0Offset0Ms)); + partition0Delay0 + partition0Offset0Ms); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), - (partition0Delay0 + partition0Offset0Ms)); + partition0Delay0 + partition0Offset0Ms); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); ingestionTimeMs = offsetClock.millis() - partition0Delay1; - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, null, null); + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay1); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); @@ -192,10 +263,11 @@ public void testRecordIngestionDelayWithAging() { offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition0Offset1Ms)); ingestionDelayTracker.setClock(offsetClock); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), - (partition0Delay1 + partition0Offset1Ms)); + partition0Delay1 + partition0Offset1Ms); ingestionTimeMs = offsetClock.millis() - partition1Delay0; - ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, ingestionTimeMs, null, null); + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), partition1Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), partition1Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); @@ -204,13 +276,13 @@ public void testRecordIngestionDelayWithAging() { offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition1Offset0Ms)); ingestionDelayTracker.setClock(offsetClock); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), - (partition1Delay0 + partition1Offset0Ms)); + partition1Delay0 + partition1Offset0Ms); ingestionDelayTracker.shutdown(); } @Test - public void testStopTrackingIngestionDelay() { + public void testStopTrackingIngestionDelay() throws Exception { final long maxTestDelay = 100; final int maxPartition = 100; @@ -218,15 +290,19 @@ public void testStopTrackingIngestionDelay() { // Use fixed clock so samples don't age Instant now = Instant.now(); ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); + Clock fixedClock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(fixedClock); + + // Mock fetchStreamPartitionOffset to return latest offsets + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(500L)); // Record a number of partitions with delay equal to partition id for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) { String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 123).getSegmentName(); - long ingestionTimeMs = clock.millis() - partitionId; - ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, null, - null); + long ingestionTimeMs = fixedClock.millis() - partitionId; + ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId), partitionId); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId), partitionId); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId), ingestionTimeMs); @@ -243,17 +319,22 @@ public void testStopTrackingIngestionDelay() { } @Test - public void testStopTrackingIngestionDelayWithSegment() { + public void testStopTrackingIngestionDelayWithSegment() throws Exception { IngestionDelayTracker ingestionDelayTracker = createTracker(); // Use fixed clock so samples don't age Instant now = Instant.now(); ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); + Clock fixedClock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(fixedClock); + + // Mock fetchStreamPartitionOffset to return latest offsets + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(500L)); String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123).getSegmentName(); - long ingestionTimeMs = clock.millis() - 10; - ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, null, null); + long ingestionTimeMs = fixedClock.millis() - 10; + ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 10); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0), 10); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), ingestionTimeMs); @@ -264,29 +345,34 @@ public void testStopTrackingIngestionDelayWithSegment() { Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); // Should not update metrics for removed segment - ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, null, null); + ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0), 0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); } @Test - public void testShutdown() { + public void testShutdown() throws Exception { final long maxTestDelay = 100; IngestionDelayTracker ingestionDelayTracker = createTracker(); // Use fixed clock so samples don't age Instant now = Instant.now(); ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); + Clock fixedClock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(fixedClock); + + // Mock fetchStreamPartitionOffset to return latest offsets + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(500L)); // Test Shutdown with partitions active for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) { String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 123).getSegmentName(); - long ingestionTimeMs = clock.millis() - partitionId; - ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, null, - null); + long ingestionTimeMs = fixedClock.millis() - partitionId; + ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, + new LongMsgOffset(ingestionTimeMs)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId), partitionId); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId), partitionId); } @@ -298,7 +384,7 @@ public void testShutdown() { } @Test - public void testRecordIngestionDelayOffset() { + public void testRecordIngestionDelayOffset() throws Exception { final int partition0 = 0; final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final int partition1 = 1; @@ -306,26 +392,26 @@ public void testRecordIngestionDelayOffset() { IngestionDelayTracker ingestionDelayTracker = createTracker(); + // Mock fetchStreamPartitionOffset to return latest offsets + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())) + .thenReturn(new LongMsgOffset(200L)); // Example latest offset + // Test tracking offset lag for a single partition StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100); - StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, - latestOffset0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 100); + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 100); // 200 - 100 // Test tracking offset lag for another partition StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50); - StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150); - ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1, - latestOffset1); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1), 100); + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1), 150); // 200 - 50 // Update offset lag for partition0 - msgOffset0 = new LongMsgOffset(150); - latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, - latestOffset0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 50); + when(_mockStreamMetadataProvider.fetchStreamPartitionOffset(any(), anyLong())).thenReturn( + new LongMsgOffset(150L)); // New latest offset + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, + new LongMsgOffset(150L)); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 50); // 150 - 100 ingestionDelayTracker.shutdown(); }