Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ingestion delay configurable: with concurrency fixes #14142

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.time.Clock;
Expand All @@ -37,15 +38,21 @@
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A Class to track realtime ingestion delay for table partitions on a given server.
* Highlights:
Expand Down Expand Up @@ -83,22 +90,37 @@
*
* TODO: handle bug situations like the one where a partition is not allocated to a given server due to a bug.
*/

public class IngestionDelayTracker {

private static class IngestionInfo {
final long _ingestionTimeMs;
final long _firstStreamIngestionTimeMs;
final StreamPartitionMsgOffset _currentOffset;
final StreamPartitionMsgOffset _latestOffset;
public static final int MAX_OFFSET_FETCH_WAIT_TIME_MS = 5000;

IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) {
private static class IngestionInfo {
final StreamMetadataProvider _streamMetadataProvider;
volatile Long _ingestionTimeMs;
volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;

IngestionInfo(Long ingestionTimeMs, Long firstStreamIngestionTimeMs, StreamPartitionMsgOffset currentOffset,
StreamMetadataProvider streamMetadataProvider) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
_streamMetadataProvider = streamMetadataProvider;
}

void updateCurrentOffset(StreamPartitionMsgOffset currentOffset) {
_currentOffset = currentOffset;
}

void updateLatestOffset(StreamPartitionMsgOffset latestOffset) {
_latestOffset = latestOffset;
}

void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(IngestionDelayTracker.class);
Expand All @@ -112,6 +134,13 @@ private static class IngestionInfo {

// Cache expire time for ignored segment if there is no update from the segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY = "offset.lag.tracking.enable";
public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY = "offset.lag.tracking.update.interval";

// Since offset lag metric does a call to Kafka, we want to make sure we don't do it too frequently.
public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = true;
public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1 minute
public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L;

// Per partition info for all partitions active for the current table.
private final Map<Integer, IngestionInfo> _ingestionInfoMap = new ConcurrentHashMap<>();
Expand All @@ -120,7 +149,7 @@ private static class IngestionInfo {
// go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
// TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay}
KKcorps marked this conversation as resolved.
Show resolved Hide resolved
// instead.
// instead
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

private final Cache<String, Boolean> _segmentsToIgnore =
Expand All @@ -139,17 +168,44 @@ private static class IngestionInfo {

private Clock _clock;

// Configuration parameters
private final boolean _enableOffsetLagMetric;
private final long _offsetLagUpdateIntervalMs;

private final StreamConsumerFactory _streamConsumerFactory;

@VisibleForTesting
public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs,
Supplier<Boolean> isServerReadyToServeQueries)
Supplier<Boolean> isServerReadyToServeQueries, TableConfig tableConfig)
throws RuntimeException {
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
_realTimeTableDataManager = realtimeTableDataManager;
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;

StreamConfig streamConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should directly pass in StreamConfig to avoid parsing it multiple times

new StreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig));
_streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);

if (realtimeTableDataManager.getInstanceDataManagerConfig() != null
&& realtimeTableDataManager.getInstanceDataManagerConfig().getConfig() != null) {
PinotConfiguration pinotConfiguration = realtimeTableDataManager.getInstanceDataManagerConfig().getConfig();
_enableOffsetLagMetric =
pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY, DEFAULT_ENABLE_OFFSET_LAG_METRIC);
_offsetLagUpdateIntervalMs = pinotConfiguration.getProperty(OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY,
DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS);

Preconditions.checkArgument(_offsetLagUpdateIntervalMs > MIN_OFFSET_LAG_UPDATE_INTERVAL,
String.format("Value of Offset lag update interval config: %s must be greater than %d",
OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY, MIN_OFFSET_LAG_UPDATE_INTERVAL));
} else {
_enableOffsetLagMetric = DEFAULT_ENABLE_OFFSET_LAG_METRIC;
_offsetLagUpdateIntervalMs = DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS;
}

// Handle negative timer values
if (scheduledExecutorThreadTickIntervalMs <= 0) {
throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s",
Expand All @@ -171,12 +227,36 @@ public Thread newThread(Runnable r) {

_scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);

if (_enableOffsetLagMetric) {
_scheduledExecutor.scheduleWithFixedDelay(this::updateLatestOffsets,
0, _offsetLagUpdateIntervalMs, TimeUnit.MILLISECONDS);
}
}

public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries) {
RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries,
TableConfig tableConfig) {
this(serverMetrics, tableNameWithType, tableDataManager, SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
isServerReadyToServeQueries);
isServerReadyToServeQueries, tableConfig);
}

private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs,
StreamMetadataProvider streamMetadataProvider) {
try {
return streamMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, maxWaitTimeMs);
} catch (Exception e) {
LOGGER.debug("Caught exception while fetching stream offset", e);
}
return null;
}

/**
* Creates a new stream metadata provider
*/
private StreamMetadataProvider createPartitionMetadataProvider(String reason, String clientId, int partitionGroupId) {
LOGGER.info("Creating new partition metadata provider, reason: {}", reason);
return _streamConsumerFactory.createPartitionMetadataProvider(clientId, partitionGroupId);
}

/*
Expand Down Expand Up @@ -210,6 +290,14 @@ private void removePartitionId(int partitionId) {
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG);

if (v._streamMetadataProvider != null) {
try {
v._streamMetadataProvider.close();
} catch (Exception e) {
LOGGER.warn("Caught exception while closing stream metadata provider for partitionId: {}", partitionId, e);
}
}
}
return null;
});
Expand Down Expand Up @@ -253,18 +341,16 @@ void setClock(Clock clock) {
* @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from
* {@link RowMetadata})
* @param currentOffset offset of the last consumed message (from {@link RowMetadata})
* @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider})
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset) {
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) {
if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
// or once the table data manager has been shutdown.
return;
}

if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && (currentOffset == null || latestOffset == null)) {
if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && currentOffset == null) {
// Do not publish metrics if stream does not return valid ingestion time or offset.
return;
}
Expand All @@ -285,12 +371,28 @@ public void updateIngestionMetrics(String segmentName, int partitionId, long ing
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
() -> getPartitionEndToEndIngestionDelayMs(partitionId));
}
if (currentOffset != null && latestOffset != null) {
if (_enableOffsetLagMetric) {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
() -> getPartitionIngestionOffsetLag(partitionId));
}

StreamMetadataProvider streamMetadataProvider = createPartitionMetadataProvider("IngestionOffsetLagCalculation",
segmentName + "_consumer_ingestionDelayTracker", partitionId);
IngestionInfo ingestionInfo =
new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, streamMetadataProvider);

if (streamMetadataProvider != null) {
StreamPartitionMsgOffset latestOffset =
fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 5000, streamMetadataProvider);
ingestionInfo.updateLatestOffset(latestOffset);
}

return ingestionInfo;
} else {
v.updateIngestionTimes(ingestionTimeMs, firstStreamIngestionTimeMs);
v.updateCurrentOffset(currentOffset);
return v;
}
return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset);
});

// If we are consuming we do not need to track this partition for removal.
Expand Down Expand Up @@ -351,6 +453,30 @@ public void timeoutInactivePartitions() {
}
}

/**
* Updates the latest offsets for each partition at a configurable frequency to reduce load.
*/
private void updateLatestOffsets() {
if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) {
return;
}
for (Map.Entry<Integer, IngestionInfo> entry : _ingestionInfoMap.entrySet()) {
int partitionId = entry.getKey();
IngestionInfo ingestionInfo = entry.getValue();
StreamMetadataProvider streamMetadataProvider = ingestionInfo._streamMetadataProvider;
if (streamMetadataProvider != null) {
try {
StreamPartitionMsgOffset latestOffset =
fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, MAX_OFFSET_FETCH_WAIT_TIME_MS,
streamMetadataProvider);
ingestionInfo.updateLatestOffset(latestOffset);
} catch (Exception e) {
LOGGER.debug("Failed to fetch latest offset for partition {}", partitionId, e);
}
}
}
}

/**
* This function is invoked when a segment goes from CONSUMING to ONLINE, so we can assert whether the partition of
* the segment is still hosted by this server after some interval of time.
Expand Down Expand Up @@ -400,20 +526,35 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
}

public long getPartitionIngestionOffsetLag(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
if (currentOffset == null || latestOffset == null) {
return 0;
}
// TODO: Support other types of offsets
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
try {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
Comment on lines +534 to +535
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This is almost never accurate because current offset and latest offset are updated separately. We probably should directly track the lag and update the lag when fetching the latest offset.
Seems currentOffset is used only here, so we can just remove both offsets and only keep lag in IngestionInfo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the issue, that we can get the currentOffset only via the updateIngestionMetric method call
and the latest offset we are calculating in seperate periodic thread

if I just want to store the lag, i'll either have to calculate latestOffset every time updateIngestionMetrics is called with currentOffset for the partition. I can't do this since it'll defeat the purpose of this PR.

if (currentOffset == null || latestOffset == null) {
return 0;
}
// TODO: Support other types of offsets
if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) {
return 0;
}
long offsetLag = ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();

if (offsetLag < 0) {
LOGGER.debug(
"Offset lag for partition {} is negative: currentOffset={}, latestOffset={}. This is most likely due to "
+ "latestOffset not being updated",
partitionId, currentOffset, latestOffset);
return 0;
}

return offsetLag;
} catch (Exception e) {
LOGGER.warn("Failed to compute ingestion offset lag for partition {}", partitionId, e);
return 0;
}
return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,10 +1842,8 @@ private void createPartitionMetadataProvider(String reason) {
private void updateIngestionMetrics(RowMetadata metadata) {
if (metadata != null) {
try {
StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true);
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId,
metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
latestOffset);
metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset());
} catch (Exception e) {
_segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", e);
}
Expand All @@ -1859,7 +1857,7 @@ private void updateIngestionMetrics(RowMetadata metadata) {
private void setIngestionDelayToZero() {
long currentTimeMs = System.currentTimeMillis();
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, currentTimeMs, currentTimeMs,
null, null);
_currentOffset);
}

// This should be done during commit? We may not always commit when we build a segment....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
Expand Down Expand Up @@ -143,7 +142,7 @@ protected void doInit() {
_leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType);
// Tracks ingestion delay of all partitions being served for this table
_ingestionDelayTracker =
new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries);
new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, _isServerReadyToServeQueries, _tableConfig);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
_statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
Expand Down Expand Up @@ -284,13 +283,11 @@ protected void doShutdown() {
* @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from
* {@link RowMetadata})
* @param currentOffset offset of the last consumed message (from {@link RowMetadata})
* @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider})
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset) {
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) {
_ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset, latestOffset);
currentOffset);
}

/**
Expand Down
Loading
Loading