Skip to content

Commit

Permalink
Async drop table stats while doing truncate and schema change.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Dec 25, 2024
1 parent 6d6ff2d commit a1eb35b
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdatePartitionStatsTarget;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
Expand Down Expand Up @@ -3395,8 +3396,9 @@ public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request
partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions));
}
if (target.isTruncate) {
analysisManager.submitAsyncDropStatsTask(target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames);
TableIf table = StatisticsUtil.findTable(target.catalogId, target.dbId, target.tableId);
analysisManager.submitAsyncDropStatsTask(table, target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames, false);
} else {
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
Integer.MAX_VALUE);
Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
1, 1, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy(),
1, 3, 10,
TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
new ThreadPoolExecutor.DiscardPolicy(),
"Drop stats executor", true);
}
}
Expand Down Expand Up @@ -696,20 +696,7 @@ public void dropStats(TableIf table, PartitionNames partitionNames) {
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
if (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
}
submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, partitionNames);
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), null, partitions);
submitAsyncDropStatsTask(table, catalogId, dbId, tableId, tableStats, partitionNames, true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
Expand All @@ -722,30 +709,55 @@ class DropStatsTask implements Runnable {
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;
private final TableIf table;
private final boolean isMaster;

public DropStatsTask(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
this.table = table;
this.isMaster = isMaster;
}

@Override
public void run() {
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
try {
if (isMaster) {
if (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
}
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar()
&& partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
StatisticsRepository.dropStatistics(catalogId, dbId, tableId, null, partitions);
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
}
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
} catch (Throwable t) {
LOG.info("Failed to async drop stats for table {}.{}.{}, reason: {}",
catalogId, dbId, tableId, t.getMessage());
}
}
}

public void submitAsyncDropStatsTask(long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames) {
public void submitAsyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
try {
dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, tableId, null, tableStats, partitionNames));
dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, null,
tableStats, partitionNames, isMaster));
} catch (Throwable t) {
LOG.info("Failed to drop stats for truncate table {}.{}.{}. Reason:{}",
catalogId, dbId, tableId, t.getMessage());
LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor");
}

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) {
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE),
"Analysis Job Executor", true);
poolName, true);
cancelExpiredTask();
} else {
executors = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
public StatisticsAutoCollector() {
super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
StatisticConstants.TASK_QUEUE_CAP);
StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,11 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<St
AnalysisManager analysisManager = new AnalysisManager();
for (int i = 0; i < 20; i++) {
System.out.println("Submit " + i);
analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null, null, false);
}
Thread.sleep(25000);
Thread.sleep(10000);
System.out.println(count.get());
Assertions.assertTrue(count.get() > 10);
Assertions.assertTrue(count.get() < 20);
Assertions.assertTrue(count.get() > 0);
Assertions.assertTrue(count.get() <= 20);
}
}
20 changes: 20 additions & 0 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ import java.util.stream.Collectors

suite("test_analyze") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

String db = "test_analyze"

String tbl = "analyzetestlimited_duplicate_all"
Expand Down Expand Up @@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599)
ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL;
"""

stats_dropped("analyze_test_with_schema_update")

sql """
ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
"""
Expand Down Expand Up @@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599)
def result_before_truncate = sql """show column stats ${tbl}"""
assertEquals(14, result_before_truncate.size())
sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
def result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
result_after_truncate = sql """show column cached stats ${tbl}"""
Expand All @@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599)
assert "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" == truncate_test_result[0][8].substring(1, 1025)

sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
sql """ANALYZE TABLE ${tbl} WITH SYNC"""
Expand Down
17 changes: 17 additions & 0 deletions regression-test/suites/statistics/test_analyze_mv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ suite("test_analyze_mv") {
assertTrue(found)
}

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_analyze_mv"""
sql """create database test_analyze_mv"""
sql """use test_analyze_mv"""
Expand Down Expand Up @@ -674,6 +690,7 @@ suite("test_analyze_mv") {
// * Test row count report and report for nereids
sql """truncate table mvTestDup"""
result_row = sql """show index stats mvTestDup mv3"""
stats_dropped("mvTestDup")
assertEquals(1, result_row.size())
assertEquals("mvTestDup", result_row[0][0])
assertEquals("mv3", result_row[0][1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@

suite("test_drop_stats_and_truncate") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
Expand Down Expand Up @@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(3, columns.size())

sql """truncate table non_part"""
stats_dropped("non_part")
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
Expand Down Expand Up @@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(9, columns.size())

sql """truncate table part"""
stats_dropped("part")
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
Expand Down

0 comments on commit a1eb35b

Please sign in to comment.