Skip to content

Commit

Permalink
Fix numWorkItemsArePending bug (#1102)
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson authored Oct 25, 2024
1 parent e55e91a commit 3c47e51
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
scopedWorkCoordinator,
rootDocumentContext
);
if (!workCoordinator.workItemsArePending(
if (!workCoordinator.workItemsNotYetComplete(
rootDocumentContext.getWorkCoordinationContext()::createItemsPendingContext
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ void completeWorkItem(
* @throws IOException
* @throws InterruptedException
*/
int numWorkItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
int numWorkItemsNotYetComplete(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException;

/**
* @return true if there are any work items that are not yet complete.
* @throws IOException
* @throws InterruptedException
*/
boolean workItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
boolean workItemsNotYetComplete(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,11 @@ public void completeWorkItem(
}
}

private int numWorkItemsArePending(
int maxItemsToCheckFor,
private int numWorkItemsNotYetCompleteInternal(
Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier
) throws IOException, InterruptedException {
try (var context = contextSupplier.get()) {
refresh(context::getRefreshContext);
// TODO: Switch this to use _count
log.warn("Switch this to use _count");
final var queryBody = "{\n"
+ "\"query\": {"
+ " \"bool\": {"
Expand All @@ -419,36 +416,43 @@ private int numWorkItemsArePending(
+ " }"
+ " ]"
+ " }"
+ "}"
+ "}";
+ "},"
+ "\"size\": 0" // This sets the number of items to include in the `hits.hits` array, but doesn't affect
+ "}"; // the integer value in `hits.total.value`

var path = INDEX_NAME + "/_search" + (maxItemsToCheckFor <= 0 ? "" : "?size=" + maxItemsToCheckFor);
var path = INDEX_NAME + "/_search";
var response = httpClient.makeJsonRequest(AbstractedHttpClient.POST_METHOD, path, null, queryBody);

final var resultHitsUpper = objectMapper.readTree(response.getPayloadBytes()).path("hits");
var statusCode = response.getStatusCode();
if (statusCode != 200) {
throw new IllegalStateException(
"Querying for pending (expired or not) work, "
+ "returned an unexpected status code "
+ statusCode
+ " instead of 200"
"Querying for pending (expired or not) work, "
+ "returned an unexpected status code "
+ statusCode
+ " instead of 200"
);
}
return resultHitsUpper.path("hits").size();
var payload = objectMapper.readTree(response.getPayloadBytes());
var totalHits = payload.path("hits").path("total").path("value").asInt();
// In the case where totalHits is 0, we need to be particularly sure that we're not missing data. The `relation`
// for the total must be `eq` or we need to throw an error because it's not safe to rely on this data.
if (totalHits == 0 && !payload.path("hits").path("total").path("relation").textValue().equals("eq")) {
throw new IllegalStateException("Querying for notYetCompleted work returned 0 hits with an unexpected total relation.");
}
return totalHits;
}
}

@Override
public int numWorkItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
public int numWorkItemsNotYetComplete(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException {
return numWorkItemsArePending(-1, contextSupplier);
// This result is not guaranteed to be accurate unless it is 0. All numbers greater than 0 are a lower bound.
return numWorkItemsNotYetCompleteInternal(contextSupplier);
}

@Override
public boolean workItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
public boolean workItemsNotYetComplete(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException {
return numWorkItemsArePending(1, contextSupplier) >= 1;
return numWorkItemsNotYetCompleteInternal(contextSupplier) >= 1;
}

enum UpdateResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ public void testAcquireLeaseHasNoUnnecessaryConflicts() throws Exception {
var testContext = WorkCoordinationTestContext.factory().withAllTracking();
final var NUM_DOCS = 100;
try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) {
Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext));
Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
for (var i = 0; i < NUM_DOCS; ++i) {
final var docId = "R" + i;
workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext);
}
Assertions.assertTrue(workCoordinator.workItemsArePending(testContext::createItemsPendingContext));
Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
}

final var seenWorkerItems = new ConcurrentHashMap<String, String>();
Expand Down Expand Up @@ -123,12 +123,12 @@ public void testAcquireLeaseForQuery() throws Exception {
final var MAX_RUNS = 2;
var executorService = Executors.newFixedThreadPool(NUM_DOCS);
try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) {
Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext));
Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
for (var i = 0; i < NUM_DOCS; ++i) {
final var docId = "R" + i;
workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext);
}
Assertions.assertTrue(workCoordinator.workItemsArePending(testContext::createItemsPendingContext));
Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
}

for (int run = 0; run < MAX_RUNS; ++run) {
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testAcquireLeaseForQuery() throws Exception {
Thread.sleep(expiration.multipliedBy(2).toMillis());
}
try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) {
Assertions.assertFalse(workCoordinator.workItemsArePending(testContext::createItemsPendingContext));
Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
}
var metrics = testContext.inMemoryInstrumentationBundle.getFinishedMetrics();
Assertions.assertNotEquals(0,
Expand Down

0 comments on commit 3c47e51

Please sign in to comment.