diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 5a7ebccf69cee..88464b8951f0d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -97,6 +97,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; // When one client connect in, we create a connection context for it. @@ -236,6 +237,9 @@ public class ConnectContext { private UUID sessionId; + private String proxyHostName; + private AtomicInteger pendingForwardRequests = new AtomicInteger(0); + // QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query // lifecycle instead of per materialized view. private QueryMaterializationContext queryMVContext; @@ -575,6 +579,24 @@ public void setConnectionId(int connectionId) { this.connectionId = connectionId; } + public String getProxyHostName() { + return proxyHostName; + } + + public void setProxyHostName(String address) { + this.proxyHostName = address; + } + + public boolean hasPendingForwardRequest() { + return pendingForwardRequests.intValue() > 0; + } + public void incPendingForwardRequest() { + pendingForwardRequests.incrementAndGet(); + } + public void decPendingForwardRequest() { + pendingForwardRequests.decrementAndGet(); + } + public void resetConnectionStartTime() { this.connectionStartTime = System.currentTimeMillis(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java index 2fbf109112b5d..ccb95764af4db 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java @@ -821,6 +821,7 @@ public Void visitRelation(Relation relation, Void context) { statement.setOrigStmt(new OriginStatement(request.getSql(), idx)); executor = new StmtExecutor(ctx, statement); + ctx.setExecutor(executor); executor.setProxy(); executor.execute(); } catch (IOException e) { @@ -832,6 +833,8 @@ public Void visitRelation(Relation relation, Void context) { // If reach here, maybe StarRocks bug. LOG.warn("Process one query failed because unknown reason: ", e); ctx.getState().setError(e.getMessage()); + } finally { + ctx.setExecutor(null); } // If stmt is also forwarded during execution, just return the forward result. diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java index 7e1bced5ea4ec..9eb3202c66481 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java @@ -243,6 +243,7 @@ public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTi params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift()); params.setForward_times(forwardTimes); params.setSession_id(ctx.getSessionId().toString()); + params.setConnectionId(ctx.getConnectionId()); TUserRoles currentRoles = new TUserRoles(); Preconditions.checkState(ctx.getCurrentRoleIds() != null); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java b/fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java new file mode 100644 index 0000000000000..136c788debdf3 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java @@ -0,0 +1,91 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.google.common.collect.Maps; + +import java.util.Map; + +// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is +// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager. +// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill. +public class ProxyContextManager { + private final Map> connectionMaps = Maps.newConcurrentMap(); + + static ProxyContextManager instance = new ProxyContextManager(); + + public static ProxyContextManager getInstance() { + return instance; + } + + public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) { + return new ScopeGuard(this, hostName, connectionId, context, set); + } + + public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) { + final Map contextMap = + connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap()); + contextMap.put(connectionId, context); + contextMap.computeIfAbsent(connectionId, cid -> context); + } + + public ConnectContext getContext(String hostname, Integer connectionId) { + final Map contextMap = connectionMaps.get(hostname); + if (contextMap == null) { + return null; + } + return contextMap.get(connectionId); + } + + public ConnectContext getContextByQueryId(String queryId) { + return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx -> + ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null); + } + + public synchronized void remove(String hostname, Integer connectionId) { + final Map contextMap = connectionMaps.get(hostname); + if (contextMap != null) { + contextMap.remove(connectionId); + if (contextMap.isEmpty()) { + connectionMaps.remove(hostname); + } + } + } + + public static class ScopeGuard implements AutoCloseable { + private ProxyContextManager manager; + private boolean set = false; + private String hostName; + private int connectionId; + + public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context, + boolean set) { + if (set) { + this.manager = manager; + this.hostName = hostName; + this.connectionId = connectionId; + manager.addContext(hostName, connectionId, context); + } + this.set = set; + } + + @Override + public void close() throws Exception { + if (set) { + manager.remove(hostName, connectionId); + } + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 40d55dca1d022..1ccd7c8f98d0d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -913,9 +913,14 @@ private void forwardToLeader() throws Exception { if (parsedStmt instanceof ExecuteStmt) { throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader"); } - leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus); - LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId()); - leaderOpExecutor.execute(); + try { + context.incPendingForwardRequest(); + leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus); + LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId()); + leaderOpExecutor.execute(); + } finally { + context.decPendingForwardRequest(); + } } private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) { @@ -1031,16 +1036,31 @@ private void handleKill() throws DdlException { handleKillQuery(killStmt.getQueryId()); } else { long id = killStmt.getConnectionId(); - ConnectContext killCtx = context.getConnectScheduler().getContext(id); + ConnectContext killCtx = null; + if (isProxy) { + final String hostName = context.getProxyHostName(); + killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id); + } else { + killCtx = context.getConnectScheduler().getContext(id); + } if (killCtx == null) { ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id); } - handleKill(killCtx, killStmt.isConnectionKill()); + handleKill(killCtx, killStmt.isConnectionKill() && !isProxy); } } // Handle kill statement. private void handleKill(ConnectContext killCtx, boolean killConnection) { + try { + if (killCtx.hasPendingForwardRequest()) { + forwardToLeader(); + return; + } + } catch (Exception e) { + LOG.warn("failed to kill connection", e); + } + Preconditions.checkNotNull(killCtx); if (context == killCtx) { // Suicide @@ -1101,6 +1121,9 @@ private void handleKillQuery(String queryId) throws DdlException { if (killCtx == null) { killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId); } + if (killCtx == null) { + killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId); + } if (killCtx == null) { ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId); } diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index fbae7a0305252..3e97bae0fd436 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -131,6 +131,7 @@ import com.starrocks.qe.ConnectProcessor; import com.starrocks.qe.DefaultCoordinator; import com.starrocks.qe.GlobalVariable; +import com.starrocks.qe.ProxyContextManager; import com.starrocks.qe.QeProcessorImpl; import com.starrocks.qe.QueryStatisticsInfo; import com.starrocks.qe.ShowExecutor; @@ -351,10 +352,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class); private final LeaderImpl leaderImpl; private final ExecuteEnv exeEnv; + private final ProxyContextManager proxyContextManager; public AtomicLong partitionRequestNum = new AtomicLong(0); public FrontendServiceImpl(ExecuteEnv exeEnv) { leaderImpl = new LeaderImpl(); + proxyContextManager = ProxyContextManager.getInstance(); this.exeEnv = exeEnv; } @@ -1120,12 +1123,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { LOG.info("receive forwarded stmt {} from FE: {}", params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown"); ConnectContext context = new ConnectContext(null); - ConnectProcessor processor = new ConnectProcessor(context); - TMasterOpResult result = processor.proxyExecute(params); - ConnectContext.remove(); - return result; + String hostname = ""; + if (clientAddr != null) { + hostname = clientAddr.getHostname(); + } + context.setProxyHostName(hostname); + boolean addToProxyManager = params.isSetConnectionId(); + final int connectionId = params.getConnectionId(); + + try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) { + ConnectProcessor processor = new ConnectProcessor(context); + return processor.proxyExecute(params); + } catch (Exception e) { + LOG.warn("unreachable path:", e); + final TMasterOpResult result = new TMasterOpResult(); + result.setErrorMsg(e.getMessage()); + return result; + } } + private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl, String clientIp) throws AuthenticationException { GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState(); @@ -1712,7 +1729,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T } } - private TNetworkAddress getClientAddr() { + public TNetworkAddress getClientAddr() { ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext(); // For NonBlockingServer, we can not get client ip. if (connectionContext != null) { diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/ForwardTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/ForwardTest.java new file mode 100644 index 0000000000000..3d9feecb005e7 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/ForwardTest.java @@ -0,0 +1,104 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe; + +import com.starrocks.common.util.UUIDUtil; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.service.ExecuteEnv; +import com.starrocks.service.FrontendServiceImpl; +import com.starrocks.thrift.TMasterOpRequest; +import com.starrocks.thrift.TMasterOpResult; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.thrift.TUserIdentity; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Test; + +public class ForwardTest { + + TMasterOpRequest makeRequest() { + TMasterOpRequest request = new TMasterOpRequest(); + request.setCatalog("default"); + request.setCluster("default"); + request.setDb("information_schema"); + request.setQueryId(UUIDUtil.genTUniqueId()); + final TUserIdentity userIdentity = new TUserIdentity(); + request.setCurrent_user_ident(userIdentity); + return request; + } + + @Test + public void testKillConnection() throws Exception { + final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance()); + final TMasterOpRequest request = makeRequest(); + request.setConnectionId(1); + request.setSql("kill QUERY 1"); + new MockUp() { + @Mock + public Long getMaxJournalId() { + return 1L; + } + }; + final TMasterOpResult result = service.forward(request); + Assert.assertEquals(result.errorMsg, ""); + } + + @Test + public void testUpgradeKillConnection() throws Exception { + final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance()); + final TMasterOpRequest request = makeRequest(); + request.setSql("kill QUERY 1"); + new MockUp() { + @Mock + public Long getMaxJournalId() { + return 1L; + } + }; + final TMasterOpResult result = service.forward(request); + Assert.assertEquals(result.errorMsg, "Unknown thread id: 1"); + } + + @Test + public void testKillWithUnknownException() throws Exception { + final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance()); + final TMasterOpRequest request = makeRequest(); + request.setSql("kill QUERY 1"); + request.setConnectionId(1); + + new MockUp() { + @Mock + public Long getMaxJournalId() { + return 1L; + } + }; + new MockUp() { + @Mock + public TNetworkAddress getClientAddr() { + return null; + } + }; + + new MockUp() { + @Mock + public TMasterOpResult proxyExecute(TMasterOpRequest request) { + throw new RuntimeException("unknown exception"); + } + }; + + final TMasterOpResult result = service.forward(request); + Assert.assertEquals(result.errorMsg, "unknown exception"); + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a316ea5d61b59..c93abc81dcee3 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -818,6 +818,7 @@ struct TMasterOpRequest { 33: optional Types.TUserRoles user_roles 34: optional i32 forward_times 35: optional string session_id + 36: optional i32 connectionId 101: optional i64 warehouse_id // begin from 101, in case of conflict with other's change }