Skip to content

Commit

Permalink
Add todos back
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartik Khare authored and Kartik Khare committed Oct 17, 2024
1 parent f96a592 commit 6ccff15
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,14 @@ public class IngestionDelayTracker {
public static final int MAX_OFFSET_FETCH_WAIT_TIME_MS = 5000;

private static class IngestionInfo {
final StreamMetadataProvider _streamMetadataProvider;
volatile Long _ingestionTimeMs;
volatile Long _firstStreamIngestionTimeMs;
volatile StreamPartitionMsgOffset _currentOffset;
volatile StreamPartitionMsgOffset _latestOffset;
final StreamMetadataProvider _streamMetadataProvider;

IngestionInfo(@Nullable Long ingestionTimeMs, @Nullable Long firstStreamIngestionTimeMs,
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamMetadataProvider streamMetadataProvider) {
IngestionInfo(Long ingestionTimeMs, Long firstStreamIngestionTimeMs, StreamPartitionMsgOffset currentOffset,
StreamMetadataProvider streamMetadataProvider) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
Expand Down Expand Up @@ -149,11 +148,14 @@ void updateIngestionTimes(long ingestionTimeMs, long firstStreamIngestionTimeMs)
// We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not
// go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
// TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay}
// instead
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

private final Cache<String, Boolean> _segmentsToIgnore =
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build();

// TODO: Make thread pool a server/cluster level config
// ScheduledExecutorService to check partitions that are inactive against ideal state.
private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2);

Expand Down

0 comments on commit 6ccff15

Please sign in to comment.