diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 0e4a1c7b42d833..c43136a08f6197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -115,6 +115,7 @@ public class AnalysisManager implements Writable { private StatisticsCache statisticsCache; private AnalysisTaskExecutor taskExecutor; + private ThreadPoolExecutor dropStatsExecutors; // Store task information in metadata. protected final NavigableMap analysisTaskInfoMap = @@ -136,8 +137,13 @@ public class AnalysisManager implements Writable { public AnalysisManager() { if (!Env.isCheckpointThread()) { this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num, - Integer.MAX_VALUE); + Integer.MAX_VALUE, "Manual Analysis Job Executor"); this.statisticsCache = new StatisticsCache(); + this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool( + 1, 3, 10, + TimeUnit.DAYS, new LinkedBlockingQueue<>(20), + new ThreadPoolExecutor.DiscardPolicy(), + "Drop stats executor", true); } } @@ -656,19 +662,52 @@ public void dropStats(TableIf table) { long catalogId = table.getDatabase().getCatalog().getId(); long dbId = table.getDatabase().getId(); long tableId = table.getId(); - removeTableStats(tableId); - Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); - Set cols = table.getSchemaAllIndexes(false).stream().map(Column::getName) - .collect(Collectors.toSet()); - invalidateLocalStats(catalogId, dbId, tableId, null, tableStats); - // Drop stats ddl is master only operation. - invalidateRemoteStats(catalogId, dbId, tableId, cols, true); - StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, table.getId(), cols); + asyncDropStatsTask(table, catalogId, dbId, tableId, tableStats); } catch (Throwable e) { LOG.warn("Failed to drop stats for table {}", table.getName(), e); } } + class DropStatsTask implements Runnable { + private final long catalogId; + private final long dbId; + private final long tableId; + private final TableStatsMeta tableStats; + private final TableIf table; + + public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, TableStatsMeta tableStats) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tableId = tableId; + this.tableStats = tableStats; + this.table = table; + } + + @Override + public void run() { + try { + removeTableStats(tableId); + Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId)); + Set cols = table.getSchemaAllIndexes(false).stream().map(Column::getName) + .collect(Collectors.toSet()); + StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, table.getId(), cols); + invalidateLocalStats(catalogId, dbId, tableId, null, tableStats); + // Drop stats ddl is master only operation. + invalidateRemoteStats(catalogId, dbId, tableId, cols, true); + } catch (Throwable e) { + LOG.warn("Failed to drop stats for table {}", table.getName(), e); + } + } + } + + public void asyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId, TableStatsMeta tableStats) { + try { + dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, tableStats)); + } catch (Throwable t) { + LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage()); + } + } + public void dropCachedStats(long catalogId, long dbId, long tableId) { TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId); StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 3db9a862d100f4..23cfc0b6d11116 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor { Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { - this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE); + this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor"); } - public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) { + public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) { if (!Env.isCheckpointThread()) { executors = ThreadPoolManager.newDaemonThreadPool( simultaneouslyRunningTaskNum, simultaneouslyRunningTaskNum, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize), new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), - "Analysis Job Executor", true); + poolName, true); cancelExpiredTask(); } else { executors = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 574b25da4228be..cc721720230886 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -58,7 +58,7 @@ public class StatisticsAutoCollector extends MasterDaemon { public StatisticsAutoCollector() { super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10)); this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP); + StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor"); } @Override diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 7f4b9abee4764f..6d845a11da46a0 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -19,6 +19,22 @@ import java.util.stream.Collectors suite("test_analyze") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + String db = "test_analyze" String tbl = "analyzetestlimited_duplicate_all" @@ -1159,6 +1175,8 @@ PARTITION `p599` VALUES IN (599) ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL; """ + stats_dropped("analyze_test_with_schema_update") + sql """ ANALYZE TABLE analyze_test_with_schema_update WITH SYNC """ @@ -1356,6 +1374,7 @@ PARTITION `p599` VALUES IN (599) def result_before_truncate = sql """show column stats ${tbl}""" assertEquals(14, result_before_truncate.size()) sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) def result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) result_after_truncate = sql """show column cached stats ${tbl}""" @@ -1382,6 +1401,7 @@ PARTITION `p599` VALUES IN (599) assert "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" == truncate_test_result[0][8].substring(1, 1025) sql """TRUNCATE TABLE ${tbl}""" + stats_dropped(tbl) result_after_truncate = sql """show column stats ${tbl}""" assertEquals(0, result_after_truncate.size()) sql """ANALYZE TABLE ${tbl} WITH SYNC""" diff --git a/regression-test/suites/statistics/test_analyze_mv.groovy b/regression-test/suites/statistics/test_analyze_mv.groovy index 06a5de85befd14..7e978a0c2b9005 100644 --- a/regression-test/suites/statistics/test_analyze_mv.groovy +++ b/regression-test/suites/statistics/test_analyze_mv.groovy @@ -108,6 +108,22 @@ suite("test_analyze_mv") { assertTrue(found) } + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_analyze_mv""" sql """create database test_analyze_mv""" sql """use test_analyze_mv""" @@ -679,6 +695,7 @@ suite("test_analyze_mv") { // Test row count report and report for nereids sql """truncate table mvTestDup""" result_row = sql """show index stats mvTestDup mv3""" + stats_dropped("mvTestDup") assertEquals(1, result_row.size()) assertEquals("mvTestDup", result_row[0][0]) assertEquals("mv3", result_row[0][1]) @@ -714,6 +731,7 @@ suite("test_analyze_mv") { // ** Embedded test for skip auto analyze when table is empty again sql """analyze table mvTestDup properties ("use.auto.analyzer" = "true")""" + stats_dropped("mvTestDup") empty_test = sql """show auto analyze mvTestDup""" assertEquals(0, empty_test.size()) empty_test = sql """show column stats mvTestDup""" diff --git a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy index 969e03cb295e0b..a6a9f4471a424a 100644 --- a/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy +++ b/regression-test/suites/statistics/test_drop_stats_and_truncate.groovy @@ -17,6 +17,22 @@ suite("test_drop_stats_and_truncate") { + def stats_dropped = { table -> + def result1 = sql """show column cached stats $table""" + def result2 = sql """show column stats $table""" + boolean dropped = false + for (int i = 0; i < 120; i++) { + if (0 == result1.size() && 0 == result2.size()) { + dropped = true; + break; + } + Thread.sleep(1000) + result1 = sql """show column cached stats $table""" + result2 = sql """show column stats $table""" + } + assertTrue(dropped) + } + sql """drop database if exists test_drop_stats_and_truncate""" sql """create database test_drop_stats_and_truncate""" sql """use test_drop_stats_and_truncate""" @@ -100,6 +116,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(3, columns.size()) sql """truncate table non_part""" + stats_dropped("non_part") result = sql """show column stats non_part""" assertEquals(0, result.size()) result = sql """show table stats non_part""" @@ -147,6 +164,7 @@ suite("test_drop_stats_and_truncate") { assertEquals(9, columns.size()) sql """truncate table part""" + stats_dropped("part") result = sql """show column stats part""" assertEquals(0, result.size()) result = sql """show table stats part"""