From f15a53058383ef9131dabf026da9d2233cdefdcd Mon Sep 17 00:00:00 2001 From: Youngwb Date: Wed, 23 Oct 2024 19:56:57 +0800 Subject: [PATCH] [Enhancement] skip trigger analyze when table not update (#52203) --- .../java/com/starrocks/common/Config.java | 4 +- .../statistics/ConnectorAnalyzeTask.java | 84 +++++++++++------- .../starrocks/statistic/StatisticUtils.java | 12 +++ .../StatisticsCollectJobFactory.java | 8 +- .../statistics/ConnectorAnalyzeTaskTest.java | 85 ++++++++++++++++++- 5 files changed, 151 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 9d500fabbcd8e..b003c48cede40 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -2110,10 +2110,10 @@ public class Config extends ConfigBase { public static long connector_table_query_trigger_analyze_small_table_rows = 10000000; // 10M @ConfField(mutable = true) - public static long connector_table_query_trigger_analyze_small_table_interval = 6 * 60 * 60; // unit: second, default 6h + public static long connector_table_query_trigger_analyze_small_table_interval = 2 * 3600; // unit: second, default 2h @ConfField(mutable = true) - public static long connector_table_query_trigger_analyze_large_table_interval = 24 * 60 * 60; // unit: second, default 24h + public static long connector_table_query_trigger_analyze_large_table_interval = 12 * 3600; // unit: second, default 12h @ConfField(mutable = true) public static int connector_table_query_trigger_analyze_max_running_task_num = 2; diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/statistics/ConnectorAnalyzeTask.java b/fe/fe-core/src/main/java/com/starrocks/connector/statistics/ConnectorAnalyzeTask.java index 4273d67b9e33e..08f23d2ea5ca4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/statistics/ConnectorAnalyzeTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/statistics/ConnectorAnalyzeTask.java @@ -21,11 +21,12 @@ import com.starrocks.catalog.Database; import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; -import com.starrocks.common.Config; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; import com.starrocks.statistic.AnalyzeStatus; +import com.starrocks.statistic.ColumnStatsMeta; import com.starrocks.statistic.ExternalAnalyzeStatus; +import com.starrocks.statistic.ExternalBasicStatsMeta; import com.starrocks.statistic.StatisticExecutor; import com.starrocks.statistic.StatisticUtils; import com.starrocks.statistic.StatisticsCollectJobFactory; @@ -35,9 +36,11 @@ import org.apache.logging.log4j.Logger; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -48,7 +51,7 @@ public class ConnectorAnalyzeTask { private final String catalogName; private final Database db; private final Table table; - private final Set columns; + private Set columns; public ConnectorAnalyzeTask(Triple tableTriple, Set columns) { this.catalogName = Preconditions.checkNotNull(tableTriple.getLeft()); @@ -69,16 +72,17 @@ public void removeColumns(Set columns) { this.columns.removeAll(columns); } - private boolean skipAnalyzeUseLastUpdateTime(LocalDateTime lastUpdateTime) { - // do not know the table row count, use small table analyze interval - if (lastUpdateTime.plusSeconds(Config.connector_table_query_trigger_analyze_small_table_interval). - isAfter(LocalDateTime.now())) { - LOG.info("Table {}.{}.{} is already analyzed at {}, skip it", catalogName, db.getFullName(), - table.getName(), lastUpdateTime); - return true; - } else { - return false; + private boolean needAnalyze(String column, LocalDateTime lastAnalyzedTime) { + LocalDateTime tableUpdateTime = StatisticUtils.getTableLastUpdateTime(table); + // check table update time is after last analyzed time + if (tableUpdateTime != null) { + if (lastAnalyzedTime.isAfter(tableUpdateTime)) { + LOG.info("Table {}.{}.{} column {} last update time: {}, last analyzed time: {}, skip analyze", + catalogName, db.getFullName(), table.getName(), column, tableUpdateTime, lastAnalyzedTime); + return false; + } } + return true; } public Optional run() { @@ -91,36 +95,51 @@ public Optional run() { .filter(status -> !status.getStatus().equals(StatsConstants.ScheduleStatus.FAILED)) .max(Comparator.comparing(ExternalAnalyzeStatus::getStartTime)); if (lastAnalyzedStatus.isPresent()) { + // Do not analyze the table if the last analyze status is PENDING or RUNNING StatsConstants.ScheduleStatus lastScheduleStatus = lastAnalyzedStatus.get().getStatus(); if (lastScheduleStatus == StatsConstants.ScheduleStatus.PENDING || lastScheduleStatus == StatsConstants.ScheduleStatus.RUNNING) { LOG.info("Table {}.{}.{} analyze status is {}, skip it", catalogName, db.getFullName(), table.getName(), lastScheduleStatus); return Optional.empty(); - } else { - // analyze status is Finished - // check the analyzed columns - List analyzedColumns = lastAnalyzedStatus.get().getColumns(); - if (analyzedColumns == null || analyzedColumns.isEmpty()) { - // analyzed all columns in last analyzed time, check the update time - if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) { - return Optional.empty(); - } - } else { - Set lastAnalyzedColumnsSet = new HashSet<>(analyzedColumns); - if (lastAnalyzedColumnsSet.containsAll(columns)) { - if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) { - return Optional.empty(); - } + } + } + + ExternalBasicStatsMeta externalBasicStatsMeta = GlobalStateMgr.getCurrentState().getAnalyzeMgr(). + getExternalTableBasicStatsMeta(catalogName, db.getFullName(), table.getName()); + Optional lastEarliestAnalyzedTime = Optional.empty(); + if (externalBasicStatsMeta != null) { + Map columnLastAnalyzedTime = externalBasicStatsMeta.getColumnStatsMetaMap().values() + .stream().collect( + Collectors.toMap(ColumnStatsMeta::getColumnName, ColumnStatsMeta::getUpdateTime)); + Set needAnalyzeColumns = new HashSet<>(columns); + + for (String column : columns) { + if (columnLastAnalyzedTime.containsKey(column)) { + LocalDateTime lastAnalyzedTime = columnLastAnalyzedTime.get(column); + Preconditions.checkNotNull(lastAnalyzedTime, "Last analyzed time is null"); + if (needAnalyze(column, lastAnalyzedTime)) { + // need analyze columns, compare the last analyzed time, get the earliest time + lastEarliestAnalyzedTime = lastEarliestAnalyzedTime. + map(localDateTime -> localDateTime.isAfter(lastAnalyzedTime) ? lastAnalyzedTime : localDateTime). + or(() -> Optional.of(lastAnalyzedTime)); } else { - if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) { - columns.removeAll(lastAnalyzedColumnsSet); - } + needAnalyzeColumns.remove(column); } } } + columns = needAnalyzeColumns; + } + + if (columns.isEmpty()) { + LOG.info("Table {}.{}.{} columns {} are all up to date, skip analyze", catalogName, db.getFullName(), + table.getName(), columns.stream().map(Object::toString).collect(Collectors.joining(","))); + return Optional.empty(); } + Set updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table, + lastEarliestAnalyzedTime.orElse(LocalDateTime.MIN)); + // Init new analyze status AnalyzeStatus analyzeStatus = new ExternalAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(), catalogName, db.getOriginName(), table.getName(), @@ -134,13 +153,14 @@ public Optional run() { ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext(); statsConnectCtx.setThreadLocalInfo(); try { - return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus)); + return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus, updatedPartitions)); } finally { ConnectContext.remove(); } } - public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) { + public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus, + Set updatedPartitions) { List columnNames = Lists.newArrayList(columns); List columnTypes = columnNames.stream().map(col -> { Column column = table.getColumn(col); @@ -150,7 +170,7 @@ public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatu StatisticExecutor statisticExecutor = new StatisticExecutor(); return statisticExecutor.collectStatistics(statsConnectCtx, StatisticsCollectJobFactory.buildExternalStatisticsCollectJob( - catalogName, db, table, null, + catalogName, db, table, updatedPartitions == null ? null : new ArrayList<>(updatedPartitions), columnNames, columnTypes, StatsConstants.AnalyzeType.FULL, StatsConstants.ScheduleType.ONCE, Maps.newHashMap()), diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index 3bc4f13c62b75..bb2d1eb4c2f8b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -73,6 +73,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static com.starrocks.sql.optimizer.Utils.getLongFromDateTime; @@ -225,6 +226,17 @@ public static LocalDateTime getTableLastUpdateTime(Table table) { } } + public static Set getUpdatedPartitionNames(Table table, LocalDateTime checkTime) { + // get updated partitions + Set updatedPartitions = null; + try { + updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(checkTime, 60); + } catch (Exception e) { + // ConnectorPartitionTraits do not support all type of table, ignore exception + } + return updatedPartitions; + } + public static LocalDateTime getPartitionLastUpdateTime(Partition partition) { long time = partition.getVisibleVersionTime(); return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), Clock.systemDefaultZone().getZone()); diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java index 9b91a6132f952..9e51e279380a3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticsCollectJobFactory.java @@ -23,7 +23,6 @@ import com.starrocks.catalog.Table; import com.starrocks.catalog.Type; import com.starrocks.common.Config; -import com.starrocks.connector.ConnectorPartitionTraits; import com.starrocks.connector.statistics.ConnectorTableColumnStats; import com.starrocks.monitor.unit.ByteSizeUnit; import com.starrocks.server.GlobalStateMgr; @@ -310,12 +309,7 @@ private static void createExternalFullStatsJob(List allTab Database db, Table table, List columnNames, List columnTypes) { // get updated partitions - Set updatedPartitions = null; - try { - updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(statisticsUpdateTime, 60); - } catch (Exception e) { - // ConnectorPartitionTraits do not support all type of table, ignore exception - } + Set updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table, statisticsUpdateTime); LOG.info("create external full statistics job for table: {}, partitions: {}", table.getName(), updatedPartitions); allTableJobMap.add(buildExternalStatisticsCollectJob(job.getCatalogName(), db, table, diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/statistics/ConnectorAnalyzeTaskTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/statistics/ConnectorAnalyzeTaskTest.java index 2667a868c86fe..b749ac817d750 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/statistics/ConnectorAnalyzeTaskTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/statistics/ConnectorAnalyzeTaskTest.java @@ -22,19 +22,26 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.plan.ConnectorPlanTestBase; import com.starrocks.statistic.AnalyzeStatus; +import com.starrocks.statistic.ColumnStatsMeta; import com.starrocks.statistic.ExternalAnalyzeStatus; +import com.starrocks.statistic.ExternalBasicStatsMeta; +import com.starrocks.statistic.ExternalFullStatisticsCollectJob; +import com.starrocks.statistic.StatisticUtils; import com.starrocks.statistic.StatsConstants; import com.starrocks.utframe.UtFrameUtils; import io.trino.hive.$internal.org.apache.commons.lang3.tuple.Triple; import mockit.Mock; import mockit.MockUp; +import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.time.LocalDateTime; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; public class ConnectorAnalyzeTaskTest { private static ConnectContext ctx; @@ -46,6 +53,12 @@ public static void beforeClass() throws Exception { ConnectorPlanTestBase.mockHiveCatalog(ctx); } + @After + public void tearDown() { + GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeStatusMap().clear(); + GlobalStateMgr.getCurrentState().getAnalyzeMgr().getExternalBasicStatsMetaMap().clear(); + } + @Test public void testMergeTask() { Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0", @@ -77,16 +90,86 @@ public void testTaskRun() { new MockUp() { @Mock - private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) { + private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus, + Set updatedPartitions) { + analyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH); return analyzeStatus; } }; + + new MockUp() { + @Mock + public LocalDateTime getTableLastUpdateTime(Table table) { + return LocalDateTime.now().minusDays(1); + } + }; // execute analyze when last analyze status is finish externalAnalyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH); + // add ExternalBasicStatsMeta when analyze finish + ExternalBasicStatsMeta externalBasicStatsMeta = new ExternalBasicStatsMeta("hive0", "partitioned_db", + "orders", List.of("o_custkey", "o_orderstatus"), StatsConstants.AnalyzeType.FULL, + LocalDateTime.now(), Maps.newHashMap()); + externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_custkey", StatsConstants.AnalyzeType.FULL, + LocalDateTime.now())); + externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_orderstatus", StatsConstants.AnalyzeType.FULL, + LocalDateTime.now())); + + GlobalStateMgr.getCurrentState().getAnalyzeMgr().addExternalBasicStatsMeta(externalBasicStatsMeta); + result = task1.run(); Assert.assertTrue(result.isPresent()); Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus); ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get(); Assert.assertEquals(List.of("o_orderkey"), externalAnalyzeStatusResult.getColumns()); } + + @Test + public void testTaskRunWithTableUpdate() { + Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0", + "partitioned_db", "orders"); + String tableUUID = table.getUUID(); + + Triple tableTriple = StatisticsUtils.getTableTripleByUUID(tableUUID); + ConnectorAnalyzeTask task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey")); + new MockUp() { + @Mock + public void collect(ConnectContext context, AnalyzeStatus analyzeStatus) throws Exception { + // do nothing + } + }; + + Optional result = task1.run(); + Assert.assertTrue(result.isPresent()); + Map columnStatsMetaMap = GlobalStateMgr.getCurrentState().getAnalyzeMgr(). + getExternalTableBasicStatsMeta("hive0", "partitioned_db", "orders").getColumnStatsMetaMap(); + Assert.assertEquals(2, columnStatsMetaMap.size()); + Assert.assertTrue(columnStatsMetaMap.containsKey("o_orderkey")); + Assert.assertTrue(columnStatsMetaMap.containsKey("o_custkey")); + + new MockUp() { + @Mock + public LocalDateTime getTableLastUpdateTime(Table table) { + return LocalDateTime.now().minusDays(1); + } + }; + // table not update, skip analyze + task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey")); + result = task1.run(); + Assert.assertTrue(result.isEmpty()); + + // table update, analyze again + new MockUp() { + @Mock + public LocalDateTime getTableLastUpdateTime(Table table) { + return LocalDateTime.now().plusMinutes(10); + } + }; + + task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey")); + result = task1.run(); + Assert.assertTrue(result.isPresent()); + Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus); + ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get(); + Assert.assertEquals(2, externalAnalyzeStatusResult.getColumns().size()); + } }