Skip to content

Commit

Permalink
[Enhancement] skip trigger analyze when table not update (#52203)
Browse files Browse the repository at this point in the history
  • Loading branch information
Youngwb authored Oct 23, 2024
1 parent 77ecc91 commit f15a530
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 42 deletions.
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2110,10 +2110,10 @@ public class Config extends ConfigBase {
public static long connector_table_query_trigger_analyze_small_table_rows = 10000000; // 10M

@ConfField(mutable = true)
public static long connector_table_query_trigger_analyze_small_table_interval = 6 * 60 * 60; // unit: second, default 6h
public static long connector_table_query_trigger_analyze_small_table_interval = 2 * 3600; // unit: second, default 2h

@ConfField(mutable = true)
public static long connector_table_query_trigger_analyze_large_table_interval = 24 * 60 * 60; // unit: second, default 24h
public static long connector_table_query_trigger_analyze_large_table_interval = 12 * 3600; // unit: second, default 12h

@ConfField(mutable = true)
public static int connector_table_query_trigger_analyze_max_running_task_num = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.ColumnStatsMeta;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.StatisticExecutor;
import com.starrocks.statistic.StatisticUtils;
import com.starrocks.statistic.StatisticsCollectJobFactory;
Expand All @@ -35,9 +36,11 @@
import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -48,7 +51,7 @@ public class ConnectorAnalyzeTask {
private final String catalogName;
private final Database db;
private final Table table;
private final Set<String> columns;
private Set<String> columns;

public ConnectorAnalyzeTask(Triple<String, Database, Table> tableTriple, Set<String> columns) {
this.catalogName = Preconditions.checkNotNull(tableTriple.getLeft());
Expand All @@ -69,16 +72,17 @@ public void removeColumns(Set<String> columns) {
this.columns.removeAll(columns);
}

private boolean skipAnalyzeUseLastUpdateTime(LocalDateTime lastUpdateTime) {
// do not know the table row count, use small table analyze interval
if (lastUpdateTime.plusSeconds(Config.connector_table_query_trigger_analyze_small_table_interval).
isAfter(LocalDateTime.now())) {
LOG.info("Table {}.{}.{} is already analyzed at {}, skip it", catalogName, db.getFullName(),
table.getName(), lastUpdateTime);
return true;
} else {
return false;
private boolean needAnalyze(String column, LocalDateTime lastAnalyzedTime) {
LocalDateTime tableUpdateTime = StatisticUtils.getTableLastUpdateTime(table);
// check table update time is after last analyzed time
if (tableUpdateTime != null) {
if (lastAnalyzedTime.isAfter(tableUpdateTime)) {
LOG.info("Table {}.{}.{} column {} last update time: {}, last analyzed time: {}, skip analyze",
catalogName, db.getFullName(), table.getName(), column, tableUpdateTime, lastAnalyzedTime);
return false;
}
}
return true;
}

public Optional<AnalyzeStatus> run() {
Expand All @@ -91,36 +95,51 @@ public Optional<AnalyzeStatus> run() {
.filter(status -> !status.getStatus().equals(StatsConstants.ScheduleStatus.FAILED))
.max(Comparator.comparing(ExternalAnalyzeStatus::getStartTime));
if (lastAnalyzedStatus.isPresent()) {
// Do not analyze the table if the last analyze status is PENDING or RUNNING
StatsConstants.ScheduleStatus lastScheduleStatus = lastAnalyzedStatus.get().getStatus();
if (lastScheduleStatus == StatsConstants.ScheduleStatus.PENDING ||
lastScheduleStatus == StatsConstants.ScheduleStatus.RUNNING) {
LOG.info("Table {}.{}.{} analyze status is {}, skip it", catalogName, db.getFullName(),
table.getName(), lastScheduleStatus);
return Optional.empty();
} else {
// analyze status is Finished
// check the analyzed columns
List<String> analyzedColumns = lastAnalyzedStatus.get().getColumns();
if (analyzedColumns == null || analyzedColumns.isEmpty()) {
// analyzed all columns in last analyzed time, check the update time
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
return Optional.empty();
}
} else {
Set<String> lastAnalyzedColumnsSet = new HashSet<>(analyzedColumns);
if (lastAnalyzedColumnsSet.containsAll(columns)) {
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
return Optional.empty();
}
}
}

ExternalBasicStatsMeta externalBasicStatsMeta = GlobalStateMgr.getCurrentState().getAnalyzeMgr().
getExternalTableBasicStatsMeta(catalogName, db.getFullName(), table.getName());
Optional<LocalDateTime> lastEarliestAnalyzedTime = Optional.empty();
if (externalBasicStatsMeta != null) {
Map<String, LocalDateTime> columnLastAnalyzedTime = externalBasicStatsMeta.getColumnStatsMetaMap().values()
.stream().collect(
Collectors.toMap(ColumnStatsMeta::getColumnName, ColumnStatsMeta::getUpdateTime));
Set<String> needAnalyzeColumns = new HashSet<>(columns);

for (String column : columns) {
if (columnLastAnalyzedTime.containsKey(column)) {
LocalDateTime lastAnalyzedTime = columnLastAnalyzedTime.get(column);
Preconditions.checkNotNull(lastAnalyzedTime, "Last analyzed time is null");
if (needAnalyze(column, lastAnalyzedTime)) {
// need analyze columns, compare the last analyzed time, get the earliest time
lastEarliestAnalyzedTime = lastEarliestAnalyzedTime.
map(localDateTime -> localDateTime.isAfter(lastAnalyzedTime) ? lastAnalyzedTime : localDateTime).
or(() -> Optional.of(lastAnalyzedTime));
} else {
if (skipAnalyzeUseLastUpdateTime(lastAnalyzedStatus.get().getStartTime())) {
columns.removeAll(lastAnalyzedColumnsSet);
}
needAnalyzeColumns.remove(column);
}
}
}
columns = needAnalyzeColumns;
}

if (columns.isEmpty()) {
LOG.info("Table {}.{}.{} columns {} are all up to date, skip analyze", catalogName, db.getFullName(),
table.getName(), columns.stream().map(Object::toString).collect(Collectors.joining(",")));
return Optional.empty();
}

Set<String> updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table,
lastEarliestAnalyzedTime.orElse(LocalDateTime.MIN));

// Init new analyze status
AnalyzeStatus analyzeStatus = new ExternalAnalyzeStatus(GlobalStateMgr.getCurrentState().getNextId(),
catalogName, db.getOriginName(), table.getName(),
Expand All @@ -134,13 +153,14 @@ public Optional<AnalyzeStatus> run() {
ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext();
statsConnectCtx.setThreadLocalInfo();
try {
return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus));
return Optional.of(executeAnalyze(statsConnectCtx, analyzeStatus, updatedPartitions));
} finally {
ConnectContext.remove();
}
}

public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) {
public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus,
Set<String> updatedPartitions) {
List<String> columnNames = Lists.newArrayList(columns);
List<Type> columnTypes = columnNames.stream().map(col -> {
Column column = table.getColumn(col);
Expand All @@ -150,7 +170,7 @@ public AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatu
StatisticExecutor statisticExecutor = new StatisticExecutor();
return statisticExecutor.collectStatistics(statsConnectCtx,
StatisticsCollectJobFactory.buildExternalStatisticsCollectJob(
catalogName, db, table, null,
catalogName, db, table, updatedPartitions == null ? null : new ArrayList<>(updatedPartitions),
columnNames, columnTypes,
StatsConstants.AnalyzeType.FULL,
StatsConstants.ScheduleType.ONCE, Maps.newHashMap()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.sql.optimizer.Utils.getLongFromDateTime;
Expand Down Expand Up @@ -225,6 +226,17 @@ public static LocalDateTime getTableLastUpdateTime(Table table) {
}
}

public static Set<String> getUpdatedPartitionNames(Table table, LocalDateTime checkTime) {
// get updated partitions
Set<String> updatedPartitions = null;
try {
updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(checkTime, 60);
} catch (Exception e) {
// ConnectorPartitionTraits do not support all type of table, ignore exception
}
return updatedPartitions;
}

public static LocalDateTime getPartitionLastUpdateTime(Partition partition) {
long time = partition.getVisibleVersionTime();
return LocalDateTime.ofInstant(Instant.ofEpochMilli(time), Clock.systemDefaultZone().getZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.connector.ConnectorPartitionTraits;
import com.starrocks.connector.statistics.ConnectorTableColumnStats;
import com.starrocks.monitor.unit.ByteSizeUnit;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -310,12 +309,7 @@ private static void createExternalFullStatsJob(List<StatisticsCollectJob> allTab
Database db, Table table, List<String> columnNames,
List<Type> columnTypes) {
// get updated partitions
Set<String> updatedPartitions = null;
try {
updatedPartitions = ConnectorPartitionTraits.build(table).getUpdatedPartitionNames(statisticsUpdateTime, 60);
} catch (Exception e) {
// ConnectorPartitionTraits do not support all type of table, ignore exception
}
Set<String> updatedPartitions = StatisticUtils.getUpdatedPartitionNames(table, statisticsUpdateTime);
LOG.info("create external full statistics job for table: {}, partitions: {}",
table.getName(), updatedPartitions);
allTableJobMap.add(buildExternalStatisticsCollectJob(job.getCatalogName(), db, table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.plan.ConnectorPlanTestBase;
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.ColumnStatsMeta;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.ExternalFullStatisticsCollectJob;
import com.starrocks.statistic.StatisticUtils;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.utframe.UtFrameUtils;
import io.trino.hive.$internal.org.apache.commons.lang3.tuple.Triple;
import mockit.Mock;
import mockit.MockUp;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class ConnectorAnalyzeTaskTest {
private static ConnectContext ctx;
Expand All @@ -46,6 +53,12 @@ public static void beforeClass() throws Exception {
ConnectorPlanTestBase.mockHiveCatalog(ctx);
}

@After
public void tearDown() {
GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeStatusMap().clear();
GlobalStateMgr.getCurrentState().getAnalyzeMgr().getExternalBasicStatsMetaMap().clear();
}

@Test
public void testMergeTask() {
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
Expand Down Expand Up @@ -77,16 +90,86 @@ public void testTaskRun() {

new MockUp<ConnectorAnalyzeTask>() {
@Mock
private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus) {
private AnalyzeStatus executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStatus analyzeStatus,
Set<String> updatedPartitions) {
analyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH);
return analyzeStatus;
}
};

new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().minusDays(1);
}
};
// execute analyze when last analyze status is finish
externalAnalyzeStatus.setStatus(StatsConstants.ScheduleStatus.FINISH);
// add ExternalBasicStatsMeta when analyze finish
ExternalBasicStatsMeta externalBasicStatsMeta = new ExternalBasicStatsMeta("hive0", "partitioned_db",
"orders", List.of("o_custkey", "o_orderstatus"), StatsConstants.AnalyzeType.FULL,
LocalDateTime.now(), Maps.newHashMap());
externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_custkey", StatsConstants.AnalyzeType.FULL,
LocalDateTime.now()));
externalBasicStatsMeta.addColumnStatsMeta(new ColumnStatsMeta("o_orderstatus", StatsConstants.AnalyzeType.FULL,
LocalDateTime.now()));

GlobalStateMgr.getCurrentState().getAnalyzeMgr().addExternalBasicStatsMeta(externalBasicStatsMeta);

result = task1.run();
Assert.assertTrue(result.isPresent());
Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus);
ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get();
Assert.assertEquals(List.of("o_orderkey"), externalAnalyzeStatusResult.getColumns());
}

@Test
public void testTaskRunWithTableUpdate() {
Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable("hive0",
"partitioned_db", "orders");
String tableUUID = table.getUUID();

Triple<String, Database, Table> tableTriple = StatisticsUtils.getTableTripleByUUID(tableUUID);
ConnectorAnalyzeTask task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
new MockUp<ExternalFullStatisticsCollectJob>() {
@Mock
public void collect(ConnectContext context, AnalyzeStatus analyzeStatus) throws Exception {
// do nothing
}
};

Optional<AnalyzeStatus> result = task1.run();
Assert.assertTrue(result.isPresent());
Map<String, ColumnStatsMeta> columnStatsMetaMap = GlobalStateMgr.getCurrentState().getAnalyzeMgr().
getExternalTableBasicStatsMeta("hive0", "partitioned_db", "orders").getColumnStatsMetaMap();
Assert.assertEquals(2, columnStatsMetaMap.size());
Assert.assertTrue(columnStatsMetaMap.containsKey("o_orderkey"));
Assert.assertTrue(columnStatsMetaMap.containsKey("o_custkey"));

new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().minusDays(1);
}
};
// table not update, skip analyze
task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
result = task1.run();
Assert.assertTrue(result.isEmpty());

// table update, analyze again
new MockUp<StatisticUtils>() {
@Mock
public LocalDateTime getTableLastUpdateTime(Table table) {
return LocalDateTime.now().plusMinutes(10);
}
};

task1 = new ConnectorAnalyzeTask(tableTriple, Sets.newHashSet("o_orderkey", "o_custkey"));
result = task1.run();
Assert.assertTrue(result.isPresent());
Assert.assertTrue(result.get() instanceof ExternalAnalyzeStatus);
ExternalAnalyzeStatus externalAnalyzeStatusResult = (ExternalAnalyzeStatus) result.get();
Assert.assertEquals(2, externalAnalyzeStatusResult.getColumns().size());
}
}

0 comments on commit f15a530

Please sign in to comment.