Skip to content

Commit

Permalink
[improvement](statistics)Async drop table stats while doing truncate …
Browse files Browse the repository at this point in the history
…and schema change(#45923) (#46010)

backport: #45923
  • Loading branch information
Jibing-Li authored Dec 26, 2024
1 parent 0c13cf8 commit 4a07efe
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class AnalysisManager implements Writable {
private StatisticsCache statisticsCache;

private AnalysisTaskExecutor taskExecutor;
private ThreadPoolExecutor dropStatsExecutors;

// Store task information in metadata.
protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
Expand All @@ -136,8 +137,13 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
Integer.MAX_VALUE);
Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
1, 3, 10,
TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
new ThreadPoolExecutor.DiscardPolicy(),
"Drop stats executor", true);
}
}

Expand Down Expand Up @@ -656,19 +662,52 @@ public void dropStats(TableIf table) {
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
Set<String> cols = table.getSchemaAllIndexes(false).stream().map(Column::getName)
.collect(Collectors.toSet());
invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, table.getId(), cols);
asyncDropStatsTask(table, catalogId, dbId, tableId, tableStats);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
}

class DropStatsTask implements Runnable {
private final long catalogId;
private final long dbId;
private final long tableId;
private final TableStatsMeta tableStats;
private final TableIf table;

public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, TableStatsMeta tableStats) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.tableStats = tableStats;
this.table = table;
}

@Override
public void run() {
try {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
Set<String> cols = table.getSchemaAllIndexes(false).stream().map(Column::getName)
.collect(Collectors.toSet());
StatisticsRepository.dropStatisticsByColNames(catalogId, dbId, table.getId(), cols);
invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
}
}

public void asyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId, TableStatsMeta tableStats) {
try {
dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, tableStats));
} catch (Throwable t) {
LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage());
}
}

public void dropCachedStats(long catalogId, long dbId, long tableId) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor");
}

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) {
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
"Analysis Job Executor", true);
poolName, true);
cancelExpiredTask();
} else {
executors = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
public StatisticsAutoCollector() {
super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10));
this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
StatisticConstants.TASK_QUEUE_CAP);
StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor");
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ import java.util.stream.Collectors

suite("test_analyze") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

String db = "test_analyze"

String tbl = "analyzetestlimited_duplicate_all"
Expand Down Expand Up @@ -1159,6 +1175,8 @@ PARTITION `p599` VALUES IN (599)
ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL;
"""

stats_dropped("analyze_test_with_schema_update")

sql """
ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
"""
Expand Down Expand Up @@ -1356,6 +1374,7 @@ PARTITION `p599` VALUES IN (599)
def result_before_truncate = sql """show column stats ${tbl}"""
assertEquals(14, result_before_truncate.size())
sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
def result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
result_after_truncate = sql """show column cached stats ${tbl}"""
Expand All @@ -1382,6 +1401,7 @@ PARTITION `p599` VALUES IN (599)
assert "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" == truncate_test_result[0][8].substring(1, 1025)

sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
sql """ANALYZE TABLE ${tbl} WITH SYNC"""
Expand Down
18 changes: 18 additions & 0 deletions regression-test/suites/statistics/test_analyze_mv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ suite("test_analyze_mv") {
assertTrue(found)
}

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_analyze_mv"""
sql """create database test_analyze_mv"""
sql """use test_analyze_mv"""
Expand Down Expand Up @@ -679,6 +695,7 @@ suite("test_analyze_mv") {
// Test row count report and report for nereids
sql """truncate table mvTestDup"""
result_row = sql """show index stats mvTestDup mv3"""
stats_dropped("mvTestDup")
assertEquals(1, result_row.size())
assertEquals("mvTestDup", result_row[0][0])
assertEquals("mv3", result_row[0][1])
Expand Down Expand Up @@ -714,6 +731,7 @@ suite("test_analyze_mv") {

// ** Embedded test for skip auto analyze when table is empty again
sql """analyze table mvTestDup properties ("use.auto.analyzer" = "true")"""
stats_dropped("mvTestDup")
empty_test = sql """show auto analyze mvTestDup"""
assertEquals(0, empty_test.size())
empty_test = sql """show column stats mvTestDup"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@

suite("test_drop_stats_and_truncate") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
Expand Down Expand Up @@ -100,6 +116,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(3, columns.size())

sql """truncate table non_part"""
stats_dropped("non_part")
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
Expand Down Expand Up @@ -147,6 +164,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(9, columns.size())

sql """truncate table part"""
stats_dropped("part")
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
Expand Down

0 comments on commit 4a07efe

Please sign in to comment.