Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ingestion delay configurable: with concurrency fixes #14142

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Oct 2, 2024

We ran into some errors with previous version #14074 and hence it was reverted with #14127 . Here's the new PR with multiple fixes after the revert

The ingestion offset lag metric currently makes a request to kafka broker to get the latest offset on every update.

However, this has lead to an increasing amount of load on kafka brokers for multiple users.

The PR adds a way to enable/disable this metric and also configure its interval using the following properties in the server or cluster configs

pinot.server.instance.offset.lag.tracking.enable: true
pinot.server.instance.offset.lag.tracking.update.interval: 60000

@codecov-commenter
Copy link

codecov-commenter commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 56.09756% with 36 lines in your changes missing coverage. Please review.

Project coverage is 63.77%. Comparing base (59551e4) to head (6ccff15).
Report is 1195 commits behind head on master.

Files with missing lines Patch % Lines
...e/data/manager/realtime/IngestionDelayTracker.java 56.09% 24 Missing and 12 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14142      +/-   ##
============================================
+ Coverage     61.75%   63.77%   +2.02%     
- Complexity      207     1534    +1327     
============================================
  Files          2436     2624     +188     
  Lines        133233   144678   +11445     
  Branches      20636    22143    +1507     
============================================
+ Hits          82274    92270    +9996     
- Misses        44911    45589     +678     
- Partials       6048     6819     +771     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 55.37% <56.09%> (-6.34%) ⬇️
java-21 63.66% <45.12%> (+2.04%) ⬆️
skip-bytebuffers-false 63.75% <56.09%> (+2.00%) ⬆️
skip-bytebuffers-true 55.28% <45.12%> (+27.55%) ⬆️
temurin 63.77% <56.09%> (+2.02%) ⬆️
unittests 63.77% <56.09%> (+2.02%) ⬆️
unittests1 55.47% <56.09%> (+8.58%) ⬆️
unittests2 34.30% <0.00%> (+6.57%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added enhancement documentation Configuration Config changes (addition/deletion/change in behavior) ingestion labels Oct 7, 2024
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

private final Cache<String, Boolean> _segmentsToIgnore =
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build();

// TODO: Make thread pool a server/cluster level config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we should keep this TODO

volatile StreamPartitionMsgOffset _latestOffset;
final StreamMetadataProvider _streamMetadataProvider;

IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are ingestionTimeMs or firstStreamIngestionTimeMs ever null?


IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamMetadataProvider streamMetadataProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamMetadataProvider is also always non-null, and we should be able to remove several unnecessary null checks

volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;
final StreamMetadataProvider _streamMetadataProvider;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Suggest moving this to the top since it is final

Comment on lines +532 to +533
StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This is almost never accurate because current offset and latest offset are updated separately. We probably should directly track the lag and update the lag when fetching the latest offset.
Seems currentOffset is used only here, so we can just remove both offsets and only keep lag in IngestionInfo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the issue, that we can get the currentOffset only via the updateIngestionMetric method call
and the latest offset we are calculating in seperate periodic thread

if I just want to store the lag, i'll either have to calculate latestOffset every time updateIngestionMetrics is called with currentOffset for the partition. I can't do this since it'll defeat the purpose of this PR.

throws RuntimeException {
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
_realTimeTableDataManager = realtimeTableDataManager;
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;

StreamConfig streamConfig =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should directly pass in StreamConfig to avoid parsing it multiple times

@Jackie-Jiang
Copy link
Contributor

@swaminathanmanish Please help review this

@KKcorps KKcorps force-pushed the ingestion_delay_configured_resolved branch from 85de7c9 to 6ccff15 Compare October 17, 2024 13:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) documentation enhancement ingestion
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants