Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix client couldn't cancel forward query (backport #52185) #52275

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
<<<<<<< HEAD
=======
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connection context for it.
Expand Down Expand Up @@ -218,6 +223,14 @@ public class ConnectContext {

private final Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap();

<<<<<<< HEAD
=======
private UUID sessionId;

private String proxyHostName;
private AtomicInteger pendingForwardRequests = new AtomicInteger(0);

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
// QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query
// lifecycle instead of per materialized view.
private QueryMaterializationContext queryMVContext;
Expand Down Expand Up @@ -489,6 +502,24 @@ public void setConnectionId(int connectionId) {
this.connectionId = connectionId;
}

public String getProxyHostName() {
return proxyHostName;
}

public void setProxyHostName(String address) {
this.proxyHostName = address;
}

public boolean hasPendingForwardRequest() {
return pendingForwardRequests.intValue() > 0;
}
public void incPendingForwardRequest() {
pendingForwardRequests.incrementAndGet();
}
public void decPendingForwardRequest() {
pendingForwardRequests.decrementAndGet();
}

public void resetConnectionStartTime() {
this.connectionStartTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ public Void visitRelation(Relation relation, Void context) {
statement.setOrigStmt(new OriginStatement(request.getSql(), idx));

executor = new StmtExecutor(ctx, statement);
ctx.setExecutor(executor);
executor.setProxy();
executor.execute();
} catch (IOException e) {
Expand All @@ -784,7 +785,13 @@ public Void visitRelation(Relation relation, Void context) {
// Catch all throwable.
// If reach here, maybe StarRocks bug.
LOG.warn("Process one query failed because unknown reason: ", e);
<<<<<<< HEAD
ctx.getState().setError("Unexpected exception: " + e.getMessage());
=======
ctx.getState().setError(e.getMessage());
} finally {
ctx.setExecutor(null);
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
}

// If stmt is also forwarded during execution, just return the forward result.
Expand Down
46 changes: 46 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,51 @@ public boolean sendResultToChannel(MysqlChannel channel) throws IOException {
public void setResult(TMasterOpResult result) {
this.result = result;
}
<<<<<<< HEAD
=======

public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTimes) {
TMasterOpRequest params = new TMasterOpRequest();
params.setCluster(SystemInfoService.DEFAULT_CLUSTER);
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setCatalog(ctx.getCurrentCatalog());
params.setDb(ctx.getDatabase());
params.setSqlMode(ctx.getSessionVariable().getSqlMode());
params.setUser_ip(ctx.getRemoteIP());
params.setTime_zone(ctx.getSessionVariable().getTimeZone());
params.setStmt_id(ctx.getStmtId());
params.setEnableStrictMode(ctx.getSessionVariable().getEnableInsertStrict());
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setForward_times(forwardTimes);
params.setSession_id(ctx.getSessionId().toString());
params.setConnectionId(ctx.getConnectionId());

TUserRoles currentRoles = new TUserRoles();
Preconditions.checkState(ctx.getCurrentRoleIds() != null);
currentRoles.setRole_id_list(new ArrayList<>(ctx.getCurrentRoleIds()));
params.setUser_roles(currentRoles);

params.setIsLastStmt(ctx.getIsLastStmt());

TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMem_limit(ctx.getSessionVariable().getMaxExecMemByte());
queryOptions.setQuery_timeout(ctx.getSessionVariable().getQueryTimeoutS());
queryOptions.setLoad_mem_limit(ctx.getSessionVariable().getLoadMemLimit());
params.setQuery_options(queryOptions);

params.setQueryId(UUIDUtil.toTUniqueId(ctx.getQueryId()));
// forward all session variables
SetStmt setStmt = ctx.getModifiedSessionVariables();
if (setStmt != null) {
params.setModified_variables_sql(AstToSQLBuilder.toSQL(setStmt));
}

params.setWarehouse_id(ctx.getCurrentWarehouseId());

return params;
}
>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
}

91 changes: 91 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.google.common.collect.Maps;

import java.util.Map;

// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is
// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager.
// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill.
public class ProxyContextManager {
private final Map<String, Map<Integer, ConnectContext>> connectionMaps = Maps.newConcurrentMap();

static ProxyContextManager instance = new ProxyContextManager();

public static ProxyContextManager getInstance() {
return instance;
}

public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) {
return new ScopeGuard(this, hostName, connectionId, context, set);
}

public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) {
final Map<Integer, ConnectContext> contextMap =
connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap());
contextMap.put(connectionId, context);
contextMap.computeIfAbsent(connectionId, cid -> context);
}

public ConnectContext getContext(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap == null) {
return null;
}
return contextMap.get(connectionId);
}

public ConnectContext getContextByQueryId(String queryId) {
return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx ->
ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null);
}

public synchronized void remove(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap != null) {
contextMap.remove(connectionId);
if (contextMap.isEmpty()) {
connectionMaps.remove(hostname);
}
}
}

public static class ScopeGuard implements AutoCloseable {
private ProxyContextManager manager;
private boolean set = false;
private String hostName;
private int connectionId;

public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context,
boolean set) {
if (set) {
this.manager = manager;
this.hostName = hostName;
this.connectionId = connectionId;
manager.addContext(hostName, connectionId, context);
}
this.set = set;
}

@Override
public void close() throws Exception {
if (set) {
manager.remove(hostName, connectionId);
}
}
}
}
94 changes: 91 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,14 @@ private void forwardToLeader() throws Exception {
if (parsedStmt instanceof ExecuteStmt) {
throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader");
}
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
try {
context.incPendingForwardRequest();
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
} finally {
context.decPendingForwardRequest();
}
}

private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) {
Expand Down Expand Up @@ -1009,11 +1014,43 @@ public void cancel(String cancelledMessage) {
// Handle kill statement.
private void handleKill() throws DdlException {
KillStmt killStmt = (KillStmt) parsedStmt;
<<<<<<< HEAD
long id = killStmt.getConnectionId();
ConnectContext killCtx = context.getConnectScheduler().getContext(id);
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
=======
if (killStmt.getQueryId() != null) {
handleKillQuery(killStmt.getQueryId());
} else {
long id = killStmt.getConnectionId();
ConnectContext killCtx = null;
if (isProxy) {
final String hostName = context.getProxyHostName();
killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id);
} else {
killCtx = context.getConnectScheduler().getContext(id);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
handleKill(killCtx, killStmt.isConnectionKill() && !isProxy);
}
}

// Handle kill statement.
private void handleKill(ConnectContext killCtx, boolean killConnection) {
try {
if (killCtx.hasPendingForwardRequest()) {
forwardToLeader();
return;
}
} catch (Exception e) {
LOG.warn("failed to kill connection", e);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
Preconditions.checkNotNull(killCtx);
if (context == killCtx) {
// Suicide
Expand All @@ -1035,6 +1072,57 @@ private void handleKill() throws DdlException {
context.getState().setOk();
}

<<<<<<< HEAD
=======
// Handle kill running query statement.
private void handleKillQuery(String queryId) throws DdlException {
if (StringUtils.isEmpty(queryId)) {
context.getState().setOk();
return;
}
// > 0 means it is forwarded from other fe
if (context.getForwardTimes() == 0) {
String errorMsg = null;
// forward to all fe
for (Frontend fe : GlobalStateMgr.getCurrentState().getNodeMgr().getFrontends(null /* all */)) {
LeaderOpExecutor leaderOpExecutor =
new LeaderOpExecutor(Pair.create(fe.getHost(), fe.getRpcPort()), parsedStmt, originStmt,
context, redirectStatus);
try {
leaderOpExecutor.execute();
// if query is successfully killed by this fe, it can return now
if (context.getState().getStateType() == MysqlStateType.OK) {
context.getState().setOk();
return;
}
errorMsg = context.getState().getErrorMessage();
} catch (TTransportException e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort();
LOG.warn(errorMsg, e);
} catch (Exception e) {
errorMsg = "Failed to connect to fe " + fe.getHost() + ":" + fe.getRpcPort() + " due to " +
e.getMessage();
LOG.warn(e.getMessage(), e);
}
}
// if the queryId is not found in any fe, print the error message
context.getState().setError(errorMsg == null ? ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg() : errorMsg);
return;
}
ConnectContext killCtx = ExecuteEnv.getInstance().getScheduler().findContextByCustomQueryId(queryId);
if (killCtx == null) {
killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId);
}
if (killCtx == null) {
killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
handleKill(killCtx, false);
}

>>>>>>> cc3a33cd92 ([BugFix] Fix client couldn't cancel forward query (#52185))
// Process set statement.
private void handleSetStmt() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import com.starrocks.qe.ConnectProcessor;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.GlobalVariable;
import com.starrocks.qe.ProxyContextManager;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.ShowExecutor;
import com.starrocks.qe.VariableMgr;
Expand Down Expand Up @@ -337,10 +338,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private final LeaderImpl leaderImpl;
private final ExecuteEnv exeEnv;
private final ProxyContextManager proxyContextManager;
public AtomicLong partitionRequestNum = new AtomicLong(0);

public FrontendServiceImpl(ExecuteEnv exeEnv) {
leaderImpl = new LeaderImpl();
proxyContextManager = ProxyContextManager.getInstance();
this.exeEnv = exeEnv;
}

Expand Down Expand Up @@ -1310,12 +1313,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.info("receive forwarded stmt {} from FE: {}",
params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown");
ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
ConnectContext.remove();
return result;
String hostname = "";
if (clientAddr != null) {
hostname = clientAddr.getHostname();
}
context.setProxyHostName(hostname);
boolean addToProxyManager = params.isSetConnectionId();
final int connectionId = params.getConnectionId();

try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) {
ConnectProcessor processor = new ConnectProcessor(context);
return processor.proxyExecute(params);
} catch (Exception e) {
LOG.warn("unreachable path:", e);
final TMasterOpResult result = new TMasterOpResult();
result.setErrorMsg(e.getMessage());
return result;
}
}


private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl,
String clientIp) throws AuthenticationException {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Expand Down Expand Up @@ -1888,7 +1905,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
}
}

private TNetworkAddress getClientAddr() {
public TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
Expand Down
Loading
Loading