Skip to content

Commit

Permalink
Support terminate_after force termination for concurrent segment search
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Sep 23, 2023
1 parent c178d8e commit d2bf2f5
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,40 @@ public void testSimpleTerminateAfterCountWithSizeAndTrackHits() throws Exception
assertEquals(0, searchResponse.getFailedShards());
}

public void testSimpleTerminateAfterCount() throws Exception {
prepareCreate("test").setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)).get();
ensureGreen();
int max = randomIntBetween(3, 29);
List<IndexRequestBuilder> docbuilders = new ArrayList<>(max);

for (int i = 1; i <= max; i++) {
String id = String.valueOf(i);
docbuilders.add(client().prepareIndex("test").setId(id).setSource("field", i));
}

indexRandom(true, docbuilders);
ensureGreen();
refresh();

SearchResponse searchResponse;
for (int i = 1; i < max; i++) {
searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max))
.setTerminateAfter(i)
.get();
assertHitCount(searchResponse, i);
assertTrue(searchResponse.isTerminatedEarly());
}

searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max))
.setTerminateAfter(2 * max)
.get();

assertHitCount(searchResponse, max);
assertFalse(searchResponse.isTerminatedEarly());
}

public void testSimpleIndexSortEarlyTerminate() throws Exception {
prepareCreate("test").setSettings(
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.sort.field", "rank")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.EarlyTerminatingCollector;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {

// Check if at all we need to call this leaf for collecting results.
if (canMatch(ctx) == false) {
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
return;
}

Expand All @@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand All @@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searchContext.setTerminatedEarly(true);
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;
private volatile boolean terminatedEarly;

protected SearchContext() {}

Expand All @@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

public boolean isTerminatedEarly() {
return this.terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}

return topDocsFactory.shouldRescore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,27 @@
import org.apache.lucene.search.LeafCollector;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
*
* @opensearch.internal
*/
public class EarlyTerminatingCollector extends FilterCollector {
static final class EarlyTerminationException extends RuntimeException {

/**
* Force termination exception
*/
public static final class EarlyTerminationException extends RuntimeException {
EarlyTerminationException(String msg) {
super(msg);
}
}

private final int maxCountHits;
private int numCollected;
private boolean forceTermination;
private final AtomicLong numCollected;
private final boolean forceTermination;
private boolean earlyTerminated;

/**
Expand All @@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
super(delegate);
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = numCollected;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
if (numCollected >= maxCountHits) {
if (numCollected.get() >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand All @@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
return new FilterLeafCollector(super.getLeafCollector(context)) {
@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
if (numCollected.incrementAndGet() > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manager for the EarlyTerminatingCollector
Expand All @@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
private final CollectorManager<C, ReduceableSearchResult> manager;
private final int maxCountHits;
private boolean forceTermination;
private final AtomicLong numCollected;

EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
this.manager = manager;
this.maxCountHits = maxCountHits;
this.forceTermination = forceTermination;
this.numCollected = new AtomicLong();
}

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
return new EarlyTerminatingCollector(
manager.newCollector(),
maxCountHits,
forceTermination /* forced termination is not supported */,
numCollected
);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
}

try {
// EarlyTerminationException gets swallowed here?
boolean shouldRescore = queryPhaseSearcher.searchWith(
searchContext,
searcher,
Expand Down Expand Up @@ -350,9 +351,8 @@ private static boolean searchWithCollector(
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
searcher.search(query, queryCollector);
if (searchContext.isTerminatedEarly()) {
queryResult.terminatedEarly(true);
}
if (searchContext.isSearchTimedOut()) {
Expand Down

0 comments on commit d2bf2f5

Please sign in to comment.