Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-6122: POC cancel submitted not started async task #2790

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stat
return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
}

/* Internal map for failed tasks, not to be used outside of the Overseer */
/* Internal map for running tasks, not to be used outside the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) {
return new DistributedMap(zkClient, "/overseer/collection-map-running");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public OverseerCollectionConfigSetProcessor(
Overseer.getRunningMap(zkStateReader.getZkClient()),
Overseer.getCompletedMap(zkStateReader.getZkClient()),
Overseer.getFailureMap(zkStateReader.getZkClient()),
Overseer.getAsyncIdsMap(zkStateReader.getZkClient()),
solrMetricsContext);
}

Expand All @@ -69,6 +70,7 @@ protected OverseerCollectionConfigSetProcessor(
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap,
DistributedMap asyncIdMap,
SolrMetricsContext solrMetricsContext) {
super(
zkStateReader,
Expand All @@ -87,6 +89,7 @@ protected OverseerCollectionConfigSetProcessor(
runningMap,
completedMap,
failureMap,
asyncIdMap,
solrMetricsContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private DistributedMap runningMap;
private DistributedMap completedMap;
private DistributedMap failureMap;
private DistributedMap asyncIdMap;

/**
* Set that maintains a list of all the tasks that are running. This is keyed on zk id of the
Expand Down Expand Up @@ -144,6 +145,7 @@ public OverseerTaskProcessor(
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap,
DistributedMap asyncIdMap,
SolrMetricsContext solrMetricsContext) {
this.zkStateReader = zkStateReader;
this.myId = myId;
Expand All @@ -154,6 +156,7 @@ public OverseerTaskProcessor(
this.runningMap = runningMap;
this.completedMap = completedMap;
this.failureMap = failureMap;
this.asyncIdMap = asyncIdMap;
this.runningZKTasks = ConcurrentHashMap.newKeySet();
this.runningTasks = ConcurrentHashMap.newKeySet();
this.completedTasks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -324,12 +327,16 @@ public void run() {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
if (asyncId != null
&& (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
log.debug(
"Found already processed task in workQueue, cleaning up. AsyncId [{}]",
asyncId);
if (head.getId().equals(oldestItemInWorkQueue))
hasLeftOverItems = false;
if (asyncId != null){
if (completedMap.contains(asyncId) || failureMap.contains(asyncId)) {
log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",
asyncId);
}
if (!asyncIdMap.contains(asyncId)){
log.debug("async id has been removed by canceling. AsyncId [{}]", asyncId);
}
workQueue.remove(head);
continue;
}
Expand Down
48 changes: 41 additions & 7 deletions solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,66 @@ public void allowOverseerPendingTasksToComplete() {
}
}

/** Returns true if the queue contains a task with the specified async id. */
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
/**
* Helper method to find the path of a first task with a specific request ID.
*
* @param requestIdKey The key of the request ID in the task message.
* @param requestId The request ID to look for.
* @return The path of the task if found, or null if not found.
*/
private String findTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (childName != null && childName.startsWith(PREFIX)) {
String path = dir + "/" + childName;
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
byte[] data = zookeeper.getData(path, null, null, true);
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
if (log.isDebugEnabled()) {
log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
log.debug("Looking for requestId '{}', found '{}'", requestId, message.get(requestIdKey));
}
if (message.get(requestIdKey).equals(requestId)) {
return path;
}
if (message.get(requestIdKey).equals(requestId)) return true;
}
}
} catch (KeeperException.NoNodeException e) {
// Another client removed the node first, try next
}
}
}
return null;
}

/** Returns true if the queue contains a task with the specified request ID. */
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
return path != null;
}

return false;
/** Removes the first task with the specified request ID from the queue. */
public void removeTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {

String path = findTaskWithRequestId(requestIdKey, requestId);
if (path != null) {
try {
zookeeper.delete(path, -1, true);
log.info("Removed task with requestId '{}' at path {}", requestId, path);
} catch (KeeperException.NoNodeException e) {
// Node might have been removed already
log.warn("Node at path {} does not exist. It might have been removed already.", path);
}
} else {
log.info("No task with requestId '{}' was found in the queue.", requestId);
}
}

/** Remove the event and save the response into the other path. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ public Map<String, Object> execute(
final String requestId = req.getParams().get(REQUESTID);
final ZkController zkController = coreContainer.getZkController();
boolean flush = req.getParams().getBool(CollectionAdminParams.FLUSH, false);
boolean force = req.getParams().getBool(CollectionAdminParams.FORCE, false);

if (requestId == null && !flush) {
throw new SolrException(
Expand Down Expand Up @@ -904,6 +905,13 @@ public Map<String, Object> execute(
rsp.getValues()
.add(
"status", "successfully removed stored response for [" + requestId + "]");
} else if (force && !zkController.getOverseerRunningMap().contains(requestId) && zkController.getOverseerCollectionQueue().containsTaskWithRequestId(ASYNC,requestId)) {
// submitted but not started yet
zkController.getOverseerCollectionQueue().removeTaskWithRequestId(ASYNC,requestId);
zkController.clearAsyncId(requestId);
rsp.getValues()
.add(
"status", "successfully removed submitted task for [" + requestId + "]");
} else {
rsp.getValues()
.add("status", "[" + requestId + "] not found in stored responses");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface CollectionAdminParams {
/* Param used by DELETESTATUS call to clear all stored responses */
String FLUSH = "flush";

String FORCE = "force";

String COLLECTION = "collection";

String COUNT_PROP = "count";
Expand Down
Loading