diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1ad8d733ddea07..4be4f7ed80654a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -113,6 +113,7 @@ import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.UpdatePartitionStatsTarget; import org.apache.doris.statistics.query.QueryStats; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; @@ -3395,8 +3396,9 @@ public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions)); } if (target.isTruncate) { - analysisManager.submitAsyncDropStatsTask(target.catalogId, target.dbId, - target.tableId, tableStats, partitionNames); + TableIf table = StatisticsUtil.findTable(target.catalogId, target.dbId, target.tableId); + analysisManager.submitAsyncDropStatsTask(table, target.catalogId, target.dbId, + target.tableId, tableStats, partitionNames, false); } else { analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats, partitionNames); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ece9daf25204c3..d92179cf2f09c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -159,12 +159,12 @@ public class AnalysisManager implements Writable { public AnalysisManager() { if (!Env.isCheckpointThread()) { this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num, - Integer.MAX_VALUE); + Integer.MAX_VALUE, "Manual Analysis Job Executor"); this.statisticsCache = new StatisticsCache(); this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool( - 1, 1, 0, - TimeUnit.DAYS, new LinkedBlockingQueue<>(10), - new ThreadPoolExecutor.AbortPolicy(), + 1, 3, 10, + TimeUnit.DAYS, new LinkedBlockingQueue<>(20), + new ThreadPoolExecutor.DiscardPolicy(), "Drop stats executor", true); } } @@ -696,20 +696,7 @@ public void dropStats(TableIf table, PartitionNames partitionNames) { long catalogId = table.getDatabase().getCatalog().getId(); long dbId = table.getDatabase().getId(); long tableId = table.getId(); - if (!table.isPartitionedTable() || partitionNames == null - || partitionNames.isStar() || partitionNames.getPartitionNames() == null) { - removeTableStats(tableId); - Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); - } - submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, partitionNames); - // Drop stats ddl is master only operation. - Set partitions = null; - if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) { - partitions = new HashSet<>(partitionNames.getPartitionNames()); - } - // Drop stats ddl is master only operation. - invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true); - StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), null, partitions); + submitAsyncDropStatsTask(table, catalogId, dbId, tableId, tableStats, partitionNames, true); } catch (Throwable e) { LOG.warn("Failed to drop stats for table {}", table.getName(), e); } @@ -722,30 +709,55 @@ class DropStatsTask implements Runnable { private final Set columns; private final TableStatsMeta tableStats; private final PartitionNames partitionNames; + private final TableIf table; + private final boolean isMaster; - public DropStatsTask(long catalogId, long dbId, long tableId, Set columns, - TableStatsMeta tableStats, PartitionNames partitionNames) { + public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, Set columns, + TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) { this.catalogId = catalogId; this.dbId = dbId; this.tableId = tableId; this.columns = columns; this.tableStats = tableStats; this.partitionNames = partitionNames; + this.table = table; + this.isMaster = isMaster; } @Override public void run() { - invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames); + try { + if (isMaster) { + if (!table.isPartitionedTable() || partitionNames == null + || partitionNames.isStar() || partitionNames.getPartitionNames() == null) { + removeTableStats(tableId); + Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); + } + // Drop stats ddl is master only operation. + Set partitions = null; + if (partitionNames != null && !partitionNames.isStar() + && partitionNames.getPartitionNames() != null) { + partitions = new HashSet<>(partitionNames.getPartitionNames()); + } + // Drop stats ddl is master only operation. + StatisticsRepository.dropStatistics(catalogId, dbId, tableId, null, partitions); + invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true); + } + invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames); + } catch (Throwable t) { + LOG.info("Failed to async drop stats for table {}.{}.{}, reason: {}", + catalogId, dbId, tableId, t.getMessage()); + } } } - public void submitAsyncDropStatsTask(long catalogId, long dbId, long tableId, - TableStatsMeta tableStats, PartitionNames partitionNames) { + public void submitAsyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId, + TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) { try { - dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, tableId, null, tableStats, partitionNames)); + dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, null, + tableStats, partitionNames, isMaster)); } catch (Throwable t) { - LOG.info("Failed to drop stats for truncate table {}.{}.{}. Reason:{}", - catalogId, dbId, tableId, t.getMessage()); + LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 5b87608ba51ab7..bc1126e9c51ba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor { Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { - this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE); + this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor"); } - public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) { + public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) { if (!Env.isCheckpointThread()) { executors = ThreadPoolManager.newDaemonThreadPool( simultaneouslyRunningTaskNum, simultaneouslyRunningTaskNum, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize), new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE), - "Analysis Job Executor", true); + poolName, true); cancelExpiredTask(); } else { executors = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 5a231db24d3798..f4fdc68f55c220 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon { public StatisticsAutoCollector() { super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP); + StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor"); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 898c3af0fdf0d2..703311c850db57 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -659,11 +659,11 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set 10); - Assertions.assertTrue(count.get() < 20); + Assertions.assertTrue(count.get() > 0); + Assertions.assertTrue(count.get() <= 20); } } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 3916dd116ad1da..1078cbf218abf4 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -19,6 +19,22 @@ import java.util.stream.Collectors suite("test_analyze") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + String db = "test_analyze" String tbl = "analyzetestlimited_duplicate_all" @@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599) ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL; """ + stats_dropped("analyze_test_with_schema_update") + sql """ ANALYZE TABLE analyze_test_with_schema_update WITH SYNC """ @@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599) def result_before_truncate = sql """show column stats ${tbl}""" assertEquals(14, result_before_truncate.size()) sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) def result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) result_after_truncate = sql """show column cached stats ${tbl}""" @@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599) asserttruncate_test_result[0][8].substring(1, 1025) sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) sql """ANALYZE TABLE ${tbl} WITH SYNC""" diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy b/regression-test/suites/statistics/test_analyze_mv.groovy index 3a8f7335375aa6..daa8d780aee130 100644 --- a/regression-test/suites/statistics/test_analyze_mv.groovy +++ b/regression-test/suites/statistics/test_analyze_mv.groovy @@ -108,6 +108,22 @@ suite("test_analyze_mv") { assertTrue(found) } + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_analyze_mv""" sql """create database test_analyze_mv""" sql """use test_analyze_mv""" @@ -674,6 +690,7 @@ suite("test_analyze_mv") { // * Test row count report and report for nereids sql """truncate table mvTestDup""" result_row = sql """show index stats mvTestDup mv3""" + stats_dropped("mvTestDup") assertEquals(1, result_row.size()) assertEquals("mvTestDup", result_row[0][0]) assertEquals("mv3", result_row[0][1]) diff --git a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy index 6dc3c6d179741a..31c1e42390024f 100644 --- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy +++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy @@ -17,6 +17,22 @@ suite("test_drop_stats_and_truncate") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_drop_stats_and_truncate""" sql """create database test_drop_stats_and_truncate""" sql """use test_drop_stats_and_truncate""" @@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(3, columns.size()) sql """truncate table non_part""" + stats_dropped("non_part") result = sql """show column stats non_part""" assertEquals(0, result.size()) result = sql """show table stats non_part""" @@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(9, columns.size()) sql """truncate table part""" + stats_dropped("part") result = sql """show column stats part""" assertEquals(0, result.size()) result = sql """show table stats part"""