Skip to content

Commit

Permalink
[BugFix] Fix client couldn't cancel forward query (#52185)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
(cherry picked from commit cc3a33c)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
  • Loading branch information
stdpain authored and mergify[bot] committed Oct 24, 2024
1 parent 6308edb commit 0eeba11
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 8 deletions.
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connection context for it.
Expand Down Expand Up @@ -236,6 +237,9 @@ public class ConnectContext {

private UUID sessionId;

private String proxyHostName;
private AtomicInteger pendingForwardRequests = new AtomicInteger(0);

// QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query
// lifecycle instead of per materialized view.
private QueryMaterializationContext queryMVContext;
Expand Down Expand Up @@ -575,6 +579,24 @@ public void setConnectionId(int connectionId) {
this.connectionId = connectionId;
}

public String getProxyHostName() {
return proxyHostName;
}

public void setProxyHostName(String address) {
this.proxyHostName = address;
}

public boolean hasPendingForwardRequest() {
return pendingForwardRequests.intValue() > 0;
}
public void incPendingForwardRequest() {
pendingForwardRequests.incrementAndGet();
}
public void decPendingForwardRequest() {
pendingForwardRequests.decrementAndGet();
}

public void resetConnectionStartTime() {
this.connectionStartTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ public Void visitRelation(Relation relation, Void context) {
statement.setOrigStmt(new OriginStatement(request.getSql(), idx));

executor = new StmtExecutor(ctx, statement);
ctx.setExecutor(executor);
executor.setProxy();
executor.execute();
} catch (IOException e) {
Expand All @@ -821,6 +822,8 @@ public Void visitRelation(Relation relation, Void context) {
// If reach here, maybe StarRocks bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(e.getMessage());
} finally {
ctx.setExecutor(null);
}

// If stmt is also forwarded during execution, just return the forward result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTi
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setForward_times(forwardTimes);
params.setSession_id(ctx.getSessionId().toString());
params.setConnectionId(ctx.getConnectionId());

TUserRoles currentRoles = new TUserRoles();
Preconditions.checkState(ctx.getCurrentRoleIds() != null);
Expand Down
91 changes: 91 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.google.common.collect.Maps;

import java.util.Map;

// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is
// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager.
// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill.
public class ProxyContextManager {
private final Map<String, Map<Integer, ConnectContext>> connectionMaps = Maps.newConcurrentMap();

static ProxyContextManager instance = new ProxyContextManager();

public static ProxyContextManager getInstance() {
return instance;
}

public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) {
return new ScopeGuard(this, hostName, connectionId, context, set);
}

public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) {
final Map<Integer, ConnectContext> contextMap =
connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap());
contextMap.put(connectionId, context);
contextMap.computeIfAbsent(connectionId, cid -> context);
}

public ConnectContext getContext(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap == null) {
return null;
}
return contextMap.get(connectionId);
}

public ConnectContext getContextByQueryId(String queryId) {
return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx ->
ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null);
}

public synchronized void remove(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap != null) {
contextMap.remove(connectionId);
if (contextMap.isEmpty()) {
connectionMaps.remove(hostname);
}
}
}

public static class ScopeGuard implements AutoCloseable {
private ProxyContextManager manager;
private boolean set = false;
private String hostName;
private int connectionId;

public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context,
boolean set) {
if (set) {
this.manager = manager;
this.hostName = hostName;
this.connectionId = connectionId;
manager.addContext(hostName, connectionId, context);
}
this.set = set;
}

@Override
public void close() throws Exception {
if (set) {
manager.remove(hostName, connectionId);
}
}
}
}
94 changes: 91 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -908,9 +908,14 @@ private void forwardToLeader() throws Exception {
if (parsedStmt instanceof ExecuteStmt) {
throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader");
}
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
try {
context.incPendingForwardRequest();
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
} finally {
context.decPendingForwardRequest();
}
}

private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) {
Expand Down Expand Up @@ -1022,11 +1027,43 @@ public void cancel(String cancelledMessage) {
// Handle kill statement.
private void handleKill() throws DdlException {
KillStmt killStmt = (KillStmt) parsedStmt;
<<<<<<< HEAD
long id = killStmt.getConnectionId();
ConnectContext killCtx = context.getConnectScheduler().getContext(id);
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
=======
if (killStmt.getQueryId() != null) {
handleKillQuery(killStmt.getQueryId());
} else {
long id = killStmt.getConnectionId();
ConnectContext killCtx = null;
if (isProxy) {
final String hostName = context.getProxyHostName();
killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id);
} else {
killCtx = context.getConnectScheduler().getContext(id);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
handleKill(killCtx, killStmt.isConnectionKill() && !isProxy);
}
}

// Handle kill statement.
private void handleKill(ConnectContext killCtx, boolean killConnection) {
try {
if (killCtx.hasPendingForwardRequest()) {
forwardToLeader();
return;
}
} catch (Exception e) {
LOG.warn("failed to kill connection", e);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
Preconditions.checkNotNull(killCtx);
if (context == killCtx) {
// Suicide
Expand All @@ -1048,6 +1085,57 @@ private void handleKill() throws DdlException {
context.getState().setOk();
}

<<<<<<< HEAD
=======
// Handle kill running query statement.
private void handleKillQuery(String queryId) throws DdlException {
if (StringUtils.isEmpty(queryId)) {
context.getState().setOk();
return;
}
// > 0 means it is forwarded from other fe
if (context.getForwardTimes() == 0) {
String errorMsg = null;
// forward to all fe
for (Frontend fe : GlobalStateMgr.getCurrentState().getNodeMgr().getFrontends(null /* all */)) {
LeaderOpExecutor leaderOpExecutor =
new LeaderOpExecutor(Pair.create(fe.getHost(), fe.getRpcPort()), parsedStmt, originStmt,
context, redirectStatus);
try {
leaderOpExecutor.execute();
// if query is successfully killed by this fe, it can return now
if (context.getState().getStateType() == MysqlStateType.OK) {
context.getState().setOk();
return;
}
errorMsg = context.getState().getErrorMessage();
} catch (TTransportException e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort();
LOG.warn(errorMsg, e);
} catch (Exception e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort() + " due to " +
e.getMessage();
LOG.warn(e.getMessage(), e);
}
}
// if the queryId is not found in any fe, print the error message
context.getState().setError(errorMsg == null ? ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg() : errorMsg);
return;
}
ConnectContext killCtx = ExecuteEnv.getInstance().getScheduler().findContextByCustomQueryId(queryId);
if (killCtx == null) {
killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId);
}
if (killCtx == null) {
killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
handleKill(killCtx, false);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
// Process set statement.
private void handleSetStmt() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import com.starrocks.qe.ConnectProcessor;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.GlobalVariable;
import com.starrocks.qe.ProxyContextManager;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.QueryStatisticsInfo;
import com.starrocks.qe.ShowExecutor;
Expand Down Expand Up @@ -350,10 +351,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private final LeaderImpl leaderImpl;
private final ExecuteEnv exeEnv;
private final ProxyContextManager proxyContextManager;
public AtomicLong partitionRequestNum = new AtomicLong(0);

public FrontendServiceImpl(ExecuteEnv exeEnv) {
leaderImpl = new LeaderImpl();
proxyContextManager = ProxyContextManager.getInstance();
this.exeEnv = exeEnv;
}

Expand Down Expand Up @@ -1117,12 +1120,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.info("receive forwarded stmt {} from FE: {}",
params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown");
ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
ConnectContext.remove();
return result;
String hostname = "";
if (clientAddr != null) {
hostname = clientAddr.getHostname();
}
context.setProxyHostName(hostname);
boolean addToProxyManager = params.isSetConnectionId();
final int connectionId = params.getConnectionId();

try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) {
ConnectProcessor processor = new ConnectProcessor(context);
return processor.proxyExecute(params);
} catch (Exception e) {
LOG.warn("unreachable path:", e);
final TMasterOpResult result = new TMasterOpResult();
result.setErrorMsg(e.getMessage());
return result;
}
}


private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl,
String clientIp) throws AuthenticationException {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Expand Down Expand Up @@ -1708,7 +1725,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
}
}

private TNetworkAddress getClientAddr() {
public TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
Expand Down
Loading

0 comments on commit 0eeba11

Please sign in to comment.