From 6558e7d61e9056bc8f17ba484590a4870202650d Mon Sep 17 00:00:00 2001 From: yqu63 Date: Tue, 22 Oct 2024 17:46:10 -0400 Subject: [PATCH 1/3] feat(OverseerTaskQueue): allow remove item from OverseerTaskQueue by async id --- .../apache/solr/cloud/OverseerTaskQueue.java | 48 ++++++++++++++++--- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java index 3092dce185e..46d8e831ac4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java @@ -73,23 +73,32 @@ public void allowOverseerPendingTasksToComplete() { } } - /** Returns true if the queue contains a task with the specified async id. */ - public boolean containsTaskWithRequestId(String requestIdKey, String requestId) - throws KeeperException, InterruptedException { + /** + * Helper method to find the path of a first task with a specific request ID. + * + * @param requestIdKey The key of the request ID in the task message. + * @param requestId The request ID to look for. + * @return The path of the task if found, or null if not found. + */ + private String findTaskWithRequestId(String requestIdKey, String requestId) + throws KeeperException, InterruptedException { List childNames = zookeeper.getChildren(dir, null, true); stats.setQueueLength(childNames.size()); for (String childName : childNames) { if (childName != null && childName.startsWith(PREFIX)) { + String path = dir + "/" + childName; try { - byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true); + byte[] data = zookeeper.getData(path, null, null, true); if (data != null) { ZkNodeProps message = ZkNodeProps.load(data); if (message.containsKey(requestIdKey)) { if (log.isDebugEnabled()) { - log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId); + log.debug("Looking for requestId '{}', found '{}'", requestId, message.get(requestIdKey)); + } + if (message.get(requestIdKey).equals(requestId)) { + return path; } - if (message.get(requestIdKey).equals(requestId)) return true; } } } catch (KeeperException.NoNodeException e) { @@ -97,8 +106,33 @@ public boolean containsTaskWithRequestId(String requestIdKey, String requestId) } } } + return null; + } + + /** Returns true if the queue contains a task with the specified request ID. */ + public boolean containsTaskWithRequestId(String requestIdKey, String requestId) + throws KeeperException, InterruptedException { + + String path = findTaskWithRequestId(requestIdKey, requestId); + return path != null; + } - return false; + /** Removes the first task with the specified request ID from the queue. */ + public void removeTaskWithRequestId(String requestIdKey, String requestId) + throws KeeperException, InterruptedException { + + String path = findTaskWithRequestId(requestIdKey, requestId); + if (path != null) { + try { + zookeeper.delete(path, -1, true); + log.info("Removed task with requestId '{}' at path {}", requestId, path); + } catch (KeeperException.NoNodeException e) { + // Node might have been removed already + log.warn("Node at path {} does not exist. It might have been removed already.", path); + } + } else { + log.info("No task with requestId '{}' was found in the queue.", requestId); + } } /** Remove the event and save the response into the other path. */ From 3382124b2fe8c92162d6f42620eef2ac18c8540b Mon Sep 17 00:00:00 2001 From: yqu63 Date: Tue, 22 Oct 2024 17:50:23 -0400 Subject: [PATCH 2/3] pass in asyncIdMap to OverseerTaskProcessor --- .../java/org/apache/solr/cloud/Overseer.java | 2 +- .../OverseerCollectionConfigSetProcessor.java | 3 +++ .../solr/cloud/OverseerTaskProcessor.java | 19 +++++++++++++------ 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index a57270c23db..d1534d277a8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -1073,7 +1073,7 @@ static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stat return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats); } - /* Internal map for failed tasks, not to be used outside of the Overseer */ + /* Internal map for running tasks, not to be used outside the Overseer */ static DistributedMap getRunningMap(final SolrZkClient zkClient) { return new DistributedMap(zkClient, "/overseer/collection-map-running"); } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java index 9536a09b6d5..e890879b36c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java @@ -54,6 +54,7 @@ public OverseerCollectionConfigSetProcessor( Overseer.getRunningMap(zkStateReader.getZkClient()), Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()), + Overseer.getAsyncIdsMap(zkStateReader.getZkClient()), solrMetricsContext); } @@ -69,6 +70,7 @@ protected OverseerCollectionConfigSetProcessor( DistributedMap runningMap, DistributedMap completedMap, DistributedMap failureMap, + DistributedMap asyncIdMap, SolrMetricsContext solrMetricsContext) { super( zkStateReader, @@ -87,6 +89,7 @@ protected OverseerCollectionConfigSetProcessor( runningMap, completedMap, failureMap, + asyncIdMap, solrMetricsContext); } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java index eb5661885c4..3fb9ff44de1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java @@ -75,6 +75,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { private DistributedMap runningMap; private DistributedMap completedMap; private DistributedMap failureMap; + private DistributedMap asyncIdMap; /** * Set that maintains a list of all the tasks that are running. This is keyed on zk id of the @@ -144,6 +145,7 @@ public OverseerTaskProcessor( DistributedMap runningMap, DistributedMap completedMap, DistributedMap failureMap, + DistributedMap asyncIdMap, SolrMetricsContext solrMetricsContext) { this.zkStateReader = zkStateReader; this.myId = myId; @@ -154,6 +156,7 @@ public OverseerTaskProcessor( this.runningMap = runningMap; this.completedMap = completedMap; this.failureMap = failureMap; + this.asyncIdMap = asyncIdMap; this.runningZKTasks = ConcurrentHashMap.newKeySet(); this.runningTasks = ConcurrentHashMap.newKeySet(); this.completedTasks = new ConcurrentHashMap<>(); @@ -324,12 +327,16 @@ public void run() { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final String asyncId = message.getStr(ASYNC); if (hasLeftOverItems) { - if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false; - if (asyncId != null - && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) { - log.debug( - "Found already processed task in workQueue, cleaning up. AsyncId [{}]", - asyncId); + if (head.getId().equals(oldestItemInWorkQueue)) + hasLeftOverItems = false; + if (asyncId != null){ + if (completedMap.contains(asyncId) || failureMap.contains(asyncId)) { + log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]", + asyncId); + } + if (!asyncIdMap.contains(asyncId)){ + log.debug("async id has been removed by canceling. AsyncId [{}]", asyncId); + } workQueue.remove(head); continue; } From b8a271301e8c9d83284fd214297259dee0fed98a Mon Sep 17 00:00:00 2001 From: yqu63 Date: Tue, 22 Oct 2024 17:52:48 -0400 Subject: [PATCH 3/3] deleteStatus for submitted task --- .../org/apache/solr/handler/admin/CollectionsHandler.java | 8 ++++++++ .../apache/solr/common/params/CollectionAdminParams.java | 2 ++ 2 files changed, 10 insertions(+) diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 7ee6da6a0c1..c7f832516ae 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -866,6 +866,7 @@ public Map execute( final String requestId = req.getParams().get(REQUESTID); final ZkController zkController = coreContainer.getZkController(); boolean flush = req.getParams().getBool(CollectionAdminParams.FLUSH, false); + boolean force = req.getParams().getBool(CollectionAdminParams.FORCE, false); if (requestId == null && !flush) { throw new SolrException( @@ -904,6 +905,13 @@ public Map execute( rsp.getValues() .add( "status", "successfully removed stored response for [" + requestId + "]"); + } else if (force && !zkController.getOverseerRunningMap().contains(requestId) && zkController.getOverseerCollectionQueue().containsTaskWithRequestId(ASYNC,requestId)) { + // submitted but not started yet + zkController.getOverseerCollectionQueue().removeTaskWithRequestId(ASYNC,requestId); + zkController.clearAsyncId(requestId); + rsp.getValues() + .add( + "status", "successfully removed submitted task for [" + requestId + "]"); } else { rsp.getValues() .add("status", "[" + requestId + "] not found in stored responses"); diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java index a7c6043fb19..f60312d38b6 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java @@ -25,6 +25,8 @@ public interface CollectionAdminParams { /* Param used by DELETESTATUS call to clear all stored responses */ String FLUSH = "flush"; + String FORCE = "force"; + String COLLECTION = "collection"; String COUNT_PROP = "count";