Skip to content

Commit

Permalink
[BugFix] Fix client couldn't cancel forward query (#52185)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
(cherry picked from commit cc3a33c)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
#	fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
#	gensrc/thrift/FrontendService.thrift
  • Loading branch information
stdpain authored and mergify[bot] committed Oct 24, 2024
1 parent a6264c9 commit 8cf9d37
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 8 deletions.
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
<<<<<<< HEAD
=======
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connection context for it.
Expand Down Expand Up @@ -218,6 +223,14 @@ public class ConnectContext {

private final Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap();

<<<<<<< HEAD
=======
private UUID sessionId;

private String proxyHostName;
private AtomicInteger pendingForwardRequests = new AtomicInteger(0);

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
// QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query
// lifecycle instead of per materialized view.
private QueryMaterializationContext queryMVContext;
Expand Down Expand Up @@ -489,6 +502,24 @@ public void setConnectionId(int connectionId) {
this.connectionId = connectionId;
}

public String getProxyHostName() {
return proxyHostName;
}

public void setProxyHostName(String address) {
this.proxyHostName = address;
}

public boolean hasPendingForwardRequest() {
return pendingForwardRequests.intValue() > 0;
}
public void incPendingForwardRequest() {
pendingForwardRequests.incrementAndGet();
}
public void decPendingForwardRequest() {
pendingForwardRequests.decrementAndGet();
}

public void resetConnectionStartTime() {
this.connectionStartTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ public Void visitRelation(Relation relation, Void context) {
statement.setOrigStmt(new OriginStatement(request.getSql(), idx));

executor = new StmtExecutor(ctx, statement);
ctx.setExecutor(executor);
executor.setProxy();
executor.execute();
} catch (IOException e) {
Expand All @@ -784,7 +785,13 @@ public Void visitRelation(Relation relation, Void context) {
// Catch all throwable.
// If reach here, maybe StarRocks bug.
LOG.warn("Process one query failed because unknown reason: ", e);
<<<<<<< HEAD
ctx.getState().setError("Unexpected exception: " + e.getMessage());
=======
ctx.getState().setError(e.getMessage());
} finally {
ctx.setExecutor(null);
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
}

// If stmt is also forwarded during execution, just return the forward result.
Expand Down
46 changes: 46 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,51 @@ public boolean sendResultToChannel(MysqlChannel channel) throws IOException {
public void setResult(TMasterOpResult result) {
this.result = result;
}
<<<<<<< HEAD
=======

public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTimes) {
TMasterOpRequest params = new TMasterOpRequest();
params.setCluster(SystemInfoService.DEFAULT_CLUSTER);
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setCatalog(ctx.getCurrentCatalog());
params.setDb(ctx.getDatabase());
params.setSqlMode(ctx.getSessionVariable().getSqlMode());
params.setUser_ip(ctx.getRemoteIP());
params.setTime_zone(ctx.getSessionVariable().getTimeZone());
params.setStmt_id(ctx.getStmtId());
params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict());
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setForward_times(forwardTimes);
params.setSession_id(ctx.getSessionId().toString());
params.setConnectionId(ctx.getConnectionId());

TUserRoles currentRoles = new TUserRoles();
Preconditions.checkState(ctx.getCurrentRoleIds() != null);
currentRoles.setRole_id_list(new ArrayList<>(ctx.getCurrentRoleIds()));
params.setUser_roles(currentRoles);

params.setIsLastStmt(ctx.getIsLastStmt());

TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMem_limit(ctx.getSessionVariable().getMaxExecMemByte());
queryOptions.setQuery_timeout(ctx.getSessionVariable().getQueryTimeoutS());
queryOptions.setLoad_mem_limit(ctx.getSessionVariable().getLoadMemLimit());
params.setQuery_options(queryOptions);

params.setQueryId(UUIDUtil.toTUniqueId(ctx.getQueryId()));
// forward all session variables
SetStmt setStmt = ctx.getModifiedSessionVariables();
if (setStmt != null) {
params.setModified_variables_sql(AstToSQLBuilder.toSQL(setStmt));
}

params.setWarehouse_id(ctx.getCurrentWarehouseId());

return params;
}
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
}

91 changes: 91 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.google.common.collect.Maps;

import java.util.Map;

// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is
// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager.
// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill.
public class ProxyContextManager {
private final Map<String, Map<Integer, ConnectContext>> connectionMaps = Maps.newConcurrentMap();

static ProxyContextManager instance = new ProxyContextManager();

public static ProxyContextManager getInstance() {
return instance;
}

public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) {
return new ScopeGuard(this, hostName, connectionId, context, set);
}

public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) {
final Map<Integer, ConnectContext> contextMap =
connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap());
contextMap.put(connectionId, context);
contextMap.computeIfAbsent(connectionId, cid -> context);
}

public ConnectContext getContext(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap == null) {
return null;
}
return contextMap.get(connectionId);
}

public ConnectContext getContextByQueryId(String queryId) {
return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx ->
ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null);
}

public synchronized void remove(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap != null) {
contextMap.remove(connectionId);
if (contextMap.isEmpty()) {
connectionMaps.remove(hostname);
}
}
}

public static class ScopeGuard implements AutoCloseable {
private ProxyContextManager manager;
private boolean set = false;
private String hostName;
private int connectionId;

public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context,
boolean set) {
if (set) {
this.manager = manager;
this.hostName = hostName;
this.connectionId = connectionId;
manager.addContext(hostName, connectionId, context);
}
this.set = set;
}

@Override
public void close() throws Exception {
if (set) {
manager.remove(hostName, connectionId);
}
}
}
}
94 changes: 91 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,14 @@ private void forwardToLeader() throws Exception {
if (parsedStmt instanceof ExecuteStmt) {
throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader");
}
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
try {
context.incPendingForwardRequest();
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
} finally {
context.decPendingForwardRequest();
}
}

private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) {
Expand Down Expand Up @@ -1009,11 +1014,43 @@ public void cancel(String cancelledMessage) {
// Handle kill statement.
private void handleKill() throws DdlException {
KillStmt killStmt = (KillStmt) parsedStmt;
<<<<<<< HEAD
long id = killStmt.getConnectionId();
ConnectContext killCtx = context.getConnectScheduler().getContext(id);
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
=======
if (killStmt.getQueryId() != null) {
handleKillQuery(killStmt.getQueryId());
} else {
long id = killStmt.getConnectionId();
ConnectContext killCtx = null;
if (isProxy) {
final String hostName = context.getProxyHostName();
killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id);
} else {
killCtx = context.getConnectScheduler().getContext(id);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
handleKill(killCtx, killStmt.isConnectionKill() && !isProxy);
}
}

// Handle kill statement.
private void handleKill(ConnectContext killCtx, boolean killConnection) {
try {
if (killCtx.hasPendingForwardRequest()) {
forwardToLeader();
return;
}
} catch (Exception e) {
LOG.warn("failed to kill connection", e);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
Preconditions.checkNotNull(killCtx);
if (context == killCtx) {
// Suicide
Expand All @@ -1035,6 +1072,57 @@ private void handleKill() throws DdlException {
context.getState().setOk();
}

<<<<<<< HEAD
=======
// Handle kill running query statement.
private void handleKillQuery(String queryId) throws DdlException {
if (StringUtils.isEmpty(queryId)) {
context.getState().setOk();
return;
}
// > 0 means it is forwarded from other fe
if (context.getForwardTimes() == 0) {
String errorMsg = null;
// forward to all fe
for (Frontend fe : GlobalStateMgr.getCurrentState().getNodeMgr().getFrontends(null /* all */)) {
LeaderOpExecutor leaderOpExecutor =
new LeaderOpExecutor(Pair.create(fe.getHost(), fe.getRpcPort()), parsedStmt, originStmt,
context, redirectStatus);
try {
leaderOpExecutor.execute();
// if query is successfully killed by this fe, it can return now
if (context.getState().getStateType() == MysqlStateType.OK) {
context.getState().setOk();
return;
}
errorMsg = context.getState().getErrorMessage();
} catch (TTransportException e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort();
LOG.warn(errorMsg, e);
} catch (Exception e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort() + " due to " +
e.getMessage();
LOG.warn(e.getMessage(), e);
}
}
// if the queryId is not found in any fe, print the error message
context.getState().setError(errorMsg == null ? ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg() : errorMsg);
return;
}
ConnectContext killCtx = ExecuteEnv.getInstance().getScheduler().findContextByCustomQueryId(queryId);
if (killCtx == null) {
killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId);
}
if (killCtx == null) {
killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
handleKill(killCtx, false);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
// Process set statement.
private void handleSetStmt() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import com.starrocks.qe.ConnectProcessor;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.GlobalVariable;
import com.starrocks.qe.ProxyContextManager;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.ShowExecutor;
import com.starrocks.qe.VariableMgr;
Expand Down Expand Up @@ -337,10 +338,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private final LeaderImpl leaderImpl;
private final ExecuteEnv exeEnv;
private final ProxyContextManager proxyContextManager;
public AtomicLong partitionRequestNum = new AtomicLong(0);

public FrontendServiceImpl(ExecuteEnv exeEnv) {
leaderImpl = new LeaderImpl();
proxyContextManager = ProxyContextManager.getInstance();
this.exeEnv = exeEnv;
}

Expand Down Expand Up @@ -1310,12 +1313,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.info("receive forwarded stmt {} from FE: {}",
params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown");
ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
ConnectContext.remove();
return result;
String hostname = "";
if (clientAddr != null) {
hostname = clientAddr.getHostname();
}
context.setProxyHostName(hostname);
boolean addToProxyManager = params.isSetConnectionId();
final int connectionId = params.getConnectionId();

try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) {
ConnectProcessor processor = new ConnectProcessor(context);
return processor.proxyExecute(params);
} catch (Exception e) {
LOG.warn("unreachable path:", e);
final TMasterOpResult result = new TMasterOpResult();
result.setErrorMsg(e.getMessage());
return result;
}
}


private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl,
String clientIp) throws AuthenticationException {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Expand Down Expand Up @@ -1888,7 +1905,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
}
}

private TNetworkAddress getClientAddr() {
public TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
Expand Down
Loading

0 comments on commit 8cf9d37

Please sign in to comment.