diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 4217329a314..f230c5ecce8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.helix.AccessOption; import org.apache.helix.task.TaskState; @@ -702,6 +703,18 @@ public void cleanUpTask() { } } + @Override + protected void nonLeaderCleanup(List tableNamesWithType) { + LOGGER.info( + "Cleaning up all task generators for tables that the controller is not the leader for. Number of tables to be" + + " cleaned up: {}. Printing at most first 10 table names to be cleaned up: [{}].", + tableNamesWithType.size(), + StringUtils.join(tableNamesWithType.stream().limit(10).map(t -> "\"" + t + "\"").toArray(), ", ")); + for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) { + _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp(tableNamesWithType); + } + } + @Nullable public Scheduler getScheduler() { return _scheduler; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java index ad421358860..fd2a4615edd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java @@ -86,6 +86,13 @@ default int getMaxAttemptsPerTask() { default void nonLeaderCleanUp() { } + /** + * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes, + * given a list of tables that the current controller isn't the leader for. + */ + default void nonLeaderCleanUp(List tableNamesWithType) { + } + /** * Gets the minionInstanceTag for the tableConfig */