Skip to content

Commit

Permalink
[BugFix] Fix client couldn't cancel forward query
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain committed Oct 24, 2024
1 parent 6e55970 commit cddfabd
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 10 deletions.
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connection context for it.
Expand Down Expand Up @@ -236,6 +237,9 @@ public class ConnectContext {

private UUID sessionId;

private String proxyHostName;
private AtomicInteger pendingForwardRequests = new AtomicInteger(0);

// QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query
// lifecycle instead of per materialized view.
private QueryMaterializationContext queryMVContext;
Expand Down Expand Up @@ -575,6 +579,24 @@ public void setConnectionId(int connectionId) {
this.connectionId = connectionId;
}

public String getProxyHostName() {
return proxyHostName;
}

public void setProxyHostName(String address) {
this.proxyHostName = address;
}

public boolean hasPendingForwardRequest() {
return pendingForwardRequests.intValue() > 0;
}
public void incPendingForwardRequest() {
pendingForwardRequests.incrementAndGet();
}
public void decPendingForwardRequest() {
pendingForwardRequests.decrementAndGet();
}

public void resetConnectionStartTime() {
this.connectionStartTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ public Void visitRelation(Relation relation, Void context) {
statement.setOrigStmt(new OriginStatement(request.getSql(), idx));

executor = new StmtExecutor(ctx, statement);
ctx.setExecutor(executor);
executor.setProxy();
executor.execute();
} catch (IOException e) {
Expand All @@ -832,6 +833,8 @@ public Void visitRelation(Relation relation, Void context) {
// If reach here, maybe StarRocks bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(e.getMessage());
} finally {
ctx.setExecutor(null);
}

// If stmt is also forwarded during execution, just return the forward result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTi
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setForward_times(forwardTimes);
params.setSession_id(ctx.getSessionId().toString());
params.setConnectionId(ctx.getConnectionId());

TUserRoles currentRoles = new TUserRoles();
Preconditions.checkState(ctx.getCurrentRoleIds() != null);
Expand Down
91 changes: 91 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.google.common.collect.Maps;

import java.util.Map;

// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is
// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager.
// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill.
public class ProxyContextManager {
private final Map<String, Map<Integer, ConnectContext>> connectionMaps = Maps.newConcurrentMap();

static ProxyContextManager instance = new ProxyContextManager();

public static ProxyContextManager getInstance() {
return instance;
}

public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) {
return new ScopeGuard(this, hostName, connectionId, context, set);
}

public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) {
final Map<Integer, ConnectContext> contextMap =
connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap());
contextMap.put(connectionId, context);
contextMap.computeIfAbsent(connectionId, cid -> context);
}

public ConnectContext getContext(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap == null) {
return null;
}
return contextMap.get(connectionId);
}

public ConnectContext getContextByQueryId(String queryId) {
return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx ->
ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null);
}

public synchronized void remove(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap != null) {
contextMap.remove(connectionId);
if (contextMap.isEmpty()) {
connectionMaps.remove(hostname);
}
}
}

public static class ScopeGuard implements AutoCloseable {
private ProxyContextManager manager;
private boolean set = false;
private String hostName;
private int connectionId;

public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context,
boolean set) {
if (set) {
this.manager = manager;
this.hostName = hostName;
this.connectionId = connectionId;
manager.addContext(hostName, connectionId, context);
}
this.set = set;
}

@Override
public void close() throws Exception {
if (set) {
manager.remove(hostName, connectionId);
}
}
}
}
33 changes: 28 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -913,9 +913,14 @@ private void forwardToLeader() throws Exception {
if (parsedStmt instanceof ExecuteStmt) {
throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader");
}
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
try {
context.incPendingForwardRequest();
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
} finally {
context.decPendingForwardRequest();
}
}

private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) {
Expand Down Expand Up @@ -1031,16 +1036,31 @@ private void handleKill() throws DdlException {
handleKillQuery(killStmt.getQueryId());
} else {
long id = killStmt.getConnectionId();
ConnectContext killCtx = context.getConnectScheduler().getContext(id);
ConnectContext killCtx = null;
if (isProxy) {
final String hostName = context.getProxyHostName();
killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id);
} else {
killCtx = context.getConnectScheduler().getContext(id);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
handleKill(killCtx, killStmt.isConnectionKill());
handleKill(killCtx, killStmt.isConnectionKill() && !isProxy);
}
}

// Handle kill statement.
private void handleKill(ConnectContext killCtx, boolean killConnection) {
try {
if (killCtx.hasPendingForwardRequest()) {
forwardToLeader();
return;
}
} catch (Exception e) {
LOG.warn("failed to kill connection", e);
}

Preconditions.checkNotNull(killCtx);
if (context == killCtx) {
// Suicide
Expand Down Expand Up @@ -1101,6 +1121,9 @@ private void handleKillQuery(String queryId) throws DdlException {
if (killCtx == null) {
killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId);
}
if (killCtx == null) {
killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import com.starrocks.qe.ConnectProcessor;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.GlobalVariable;
import com.starrocks.qe.ProxyContextManager;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.QueryStatisticsInfo;
import com.starrocks.qe.ShowExecutor;
Expand Down Expand Up @@ -351,10 +352,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private final LeaderImpl leaderImpl;
private final ExecuteEnv exeEnv;
private final ProxyContextManager proxyContextManager;
public AtomicLong partitionRequestNum = new AtomicLong(0);

public FrontendServiceImpl(ExecuteEnv exeEnv) {
leaderImpl = new LeaderImpl();
proxyContextManager = ProxyContextManager.getInstance();
this.exeEnv = exeEnv;
}

Expand Down Expand Up @@ -1120,12 +1123,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.info("receive forwarded stmt {} from FE: {}",
params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown");
ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
ConnectContext.remove();
return result;
String hostname = "";
if (clientAddr != null) {
hostname = clientAddr.getHostname();
}
context.setProxyHostName(hostname);
boolean addToProxyManager = params.isSetConnectionId();
final int connectionId = params.getConnectionId();

try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) {
ConnectProcessor processor = new ConnectProcessor(context);
return processor.proxyExecute(params);
} catch (Exception e) {
LOG.warn("unreachable path:", e);
final TMasterOpResult result = new TMasterOpResult();
result.setErrorMsg(e.getMessage());
return result;
}
}


private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl,
String clientIp) throws AuthenticationException {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Expand Down Expand Up @@ -1712,7 +1729,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
}
}

private TNetworkAddress getClientAddr() {
public TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
Expand Down
104 changes: 104 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/ForwardTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.common.util.UUIDUtil;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.service.FrontendServiceImpl;
import com.starrocks.thrift.TMasterOpRequest;
import com.starrocks.thrift.TMasterOpResult;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TUserIdentity;
import mockit.Mock;
import mockit.MockUp;
import org.junit.Assert;
import org.junit.Test;

public class ForwardTest {

TMasterOpRequest makeRequest() {
TMasterOpRequest request = new TMasterOpRequest();
request.setCatalog("default");
request.setCluster("default");
request.setDb("information_schema");
request.setQueryId(UUIDUtil.genTUniqueId());
final TUserIdentity userIdentity = new TUserIdentity();
request.setCurrent_user_ident(userIdentity);
return request;
}

@Test
public void testKillConnection() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setConnectionId(1);
request.setSql("kill QUERY 1");
new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "");
}

@Test
public void testUpgradeKillConnection() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setSql("kill QUERY 1");
new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "Unknown thread id: 1");
}

@Test
public void testKillWithUnknownException() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setSql("kill QUERY 1");
request.setConnectionId(1);

new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
new MockUp<FrontendServiceImpl>() {
@Mock
public TNetworkAddress getClientAddr() {
return null;
}
};

new MockUp<ConnectProcessor>() {
@Mock
public TMasterOpResult proxyExecute(TMasterOpRequest request) {
throw new RuntimeException("unknown exception");
}
};

final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "unknown exception");
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ struct TMasterOpRequest {
33: optional Types.TUserRoles user_roles
34: optional i32 forward_times
35: optional string session_id
36: optional i32 connectionId

101: optional i64 warehouse_id // begin from 101, in case of conflict with other's change
}
Expand Down

0 comments on commit cddfabd

Please sign in to comment.