Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] only call getAliveComputeNodes once per OlapScanNode (backport #52168) #52267

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ public class OlapScanNode extends ScanNode {

private long totalScanRangeBytes = 0;

// Set to true after it's confirmed at some point during the execution of this request that there is some living CN.
// Set just once per query.
private boolean alreadyFoundSomeLivingCn = false;

// Constructs node to scan given data files of table 'tbl'.
// Constructs node to scan given data files of table 'tbl'.
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
Expand Down Expand Up @@ -500,6 +504,25 @@ public List<TScanRangeLocations> updateScanRangeLocations(List<TScanRangeLocatio
return newLocations;
}


private void checkSomeAliveComputeNode() throws ErrorReportException {
// Note that it's theoretically possible that there were some living CN earlier in this query's execution, and then
// they all died, but in that case, the problem this will be surfaced later anyway.
if (alreadyFoundSomeLivingCn) {
return;
}
// We prefer to call getAliveComputeNodes infrequently, as it can come to dominate the execution time of a query in the
// frontend if there are many calls per request (e.g. one per partition when there are many partitions).
if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
if (CollectionUtils.isEmpty(warehouseManager.getAliveComputeNodes(warehouseId))) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}
}
alreadyFoundSomeLivingCn = true;
}

public void addScanRangeLocations(Partition partition,
PhysicalPartition physicalPartition,
MaterializedIndex index,
Expand All @@ -517,13 +540,7 @@ public void addScanRangeLocations(Partition partition,
selectedPartitionNames.add(partition.getName());
selectedPartitionVersions.add(visibleVersion);

if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
if (CollectionUtils.isEmpty(warehouseManager.getAliveComputeNodes(warehouseId))) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}
}
checkSomeAliveComputeNode();
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
LOG.debug("{} tabletId={}", (logNum++), tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,83 @@ public RunMode getCurrentRunMode() {
Assert.assertEquals("No alive backend or compute node in warehouse null.", ex.getMessage());
}

@Test
public void testSelectWorkerGroupByWarehouseId_checkAliveNodesOnce(@Mocked WarehouseManager mockWarehouseMgr)
throws UserException {
Backend b1 = new Backend(10001L, "192.168.0.1", 9050);
b1.setBePort(9060);
b1.setAlive(false);
b1.setWarehouseId(WarehouseManager.DEFAULT_WAREHOUSE_ID);

new MockUp<NodeMgr>() {
@Mock
public SystemInfoService getClusterInfo() {
return systemInfo;
}
};

new MockUp<SystemInfoService>() {
@Mock
public ComputeNode getBackendOrComputeNode(long nodeId) {
return b1;
}
};

new MockUp<StarOSAgent>() {
@Mock
public List<Long> getWorkersByWorkerGroup(long workerGroupId) throws UserException {
if (workerGroupId == StarOSAgent.DEFAULT_WORKER_GROUP_ID) {
return Lists.newArrayList(b1.getId());
}
return Lists.newArrayList();
}
};

new MockUp<GlobalStateMgr>() {
@Mock
public NodeMgr getNodeMgr() {
return nodeMgr;
}

@Mock
public StarOSAgent getStarOSAgent() {
return starOSAgent;
}

@Mock
public WarehouseManager getWarehouseMgr() {
return mockWarehouseMgr;
}

};

ComputeNode livingCn = new ComputeNode();
livingCn.setAlive(true);
new Expectations() {
{
// This is the point of the test -- we only want to call this once even though we're calling
// addScanRangeLocations multiple times.
mockWarehouseMgr.getAliveComputeNodes(WarehouseManager.DEFAULT_WAREHOUSE_ID);
times = 1;
result = Lists.newArrayList(livingCn);
}
};
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};

OlapScanNode scanNode = newOlapScanNode();
Partition partition = new Partition(123, "aaa", null, null);
MaterializedIndex index = new MaterializedIndex(1, MaterializedIndex.IndexState.NORMAL);
scanNode.addScanRangeLocations(partition, partition, index, Collections.emptyList(), 1);
// Since this is the second call to addScanRangeLocations on the same OlapScanNode, we do not expect another call to
// getAliveComputeNodes.
scanNode.addScanRangeLocations(partition, partition, index, Collections.emptyList(), 1);
}

private OlapScanNode newOlapScanNode() {
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
OlapTable table = new OlapTable();
Expand Down
Loading