Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix numWorkItemsArePending bug #1102

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,11 @@
}
}

private int numWorkItemsArePending(
int maxItemsToCheckFor,
private int numWorkItemsArePendingInternal(
Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier
) throws IOException, InterruptedException {
try (var context = contextSupplier.get()) {
refresh(context::getRefreshContext);
// TODO: Switch this to use _count
log.warn("Switch this to use _count");
Comment on lines -406 to -407
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference: The author of this P found that _count was slower than the search

final var queryBody = "{\n"
+ "\"query\": {"
+ " \"bool\": {"
Expand All @@ -419,36 +416,35 @@
+ " }"
+ " ]"
+ " }"
+ "}"
+ "},"
+ "\"size\": 1"
+ "}";

var path = INDEX_NAME + "/_search" + (maxItemsToCheckFor <= 0 ? "" : "?size=" + maxItemsToCheckFor);
var path = INDEX_NAME + "/_search";
var response = httpClient.makeJsonRequest(AbstractedHttpClient.POST_METHOD, path, null, queryBody);

final var resultHitsUpper = objectMapper.readTree(response.getPayloadBytes()).path("hits");
var statusCode = response.getStatusCode();
if (statusCode != 200) {
throw new IllegalStateException(
"Querying for pending (expired or not) work, "
+ "returned an unexpected status code "
+ statusCode
+ " instead of 200"
"Querying for pending (expired or not) work, "
+ "returned an unexpected status code "
+ statusCode
+ " instead of 200"
);
}
return resultHitsUpper.path("hits").size();
return objectMapper.readTree(response.getPayloadBytes()).path("hits").path("total").path("value").asInt();
}
}

@Override
public int numWorkItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException {
return numWorkItemsArePending(-1, contextSupplier);
return numWorkItemsArePendingInternal(contextSupplier);

Check warning on line 441 in RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java#L441

Added line #L441 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you get back the right value for this if you're running the query w/ size=1?

}

@Override
public boolean workItemsArePending(Supplier<IWorkCoordinationContexts.IPendingWorkItemsContext> contextSupplier)
throws IOException, InterruptedException {
return numWorkItemsArePending(1, contextSupplier) >= 1;
return numWorkItemsArePendingInternal(contextSupplier) >= 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very different call now. My original intention was that in this case, I really only needed to know if the set was empty or not. Counting up thousands of documents won't be necessary. If -1 wasn't working to return the whole list, we should probably just have two separate functions - isEmpty() and count().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I ran some tests with a couple thousand items in the index (And it turns out that adding 3790 items pretty quickly basically completely locks up a testcontainer cluster -- even with 30 second sleeps between attempts, I haven't been able to add a single document to it in >10 minutes -- our worst case situations for blocking indices are really bad).

The tests here are 1/ _count, 2/ _search with size=1, 3/ _search with terminate_after=1. I ran each test 5 times, but I think we should just compare on the first because it looks like they're just cached after that.

_count: 0.110 total
_search with size=1: 0.058 total
_search with terminate_after=1: 0.080 total

Conveniently _search with size=1 includes a block like "hits":{"total":{"value":3790,"relation":"eq"}}, so it is actually computing (either exact or an approximation) the total number of hits. I do wonder how it's doing that and still twice as fast as _count, but we might as well take advantage of that and replace the whole query here with this. Amusingly, this is actually what I originally did, but then i saw the note about _count and reworked it.

❯ for i in {1..5}; do time curl http://localhost:58803/.migrations_working_state/_count -H 'Content-Type: application/json' -d '{"query": {"match_all": {}}}'; done
{"count":3790,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}curl http://localhost:58803/.migrations_working_state/_count -H  -d   0.00s user 0.01s system 6% cpu 0.110 total
{"count":3790,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}curl http://localhost:58803/.migrations_working_state/_count -H  -d   0.00s user 0.01s system 10% cpu 0.068 total
{"count":3790,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}curl http://localhost:58803/.migrations_working_state/_count -H  -d   0.00s user 0.01s system 24% cpu 0.032 total
{"count":3790,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}curl http://localhost:58803/.migrations_working_state/_count -H  -d   0.00s user 0.00s system 22% cpu 0.027 total
{"count":3790,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}curl http://localhost:58803/.migrations_working_state/_count -H  -d   0.00s user 0.00s system 25% cpu 0.023 total
❯
❯
❯ for i in {1..5}; do time curl http://localhost:58803/.migrations_working_state/_search -H 'Content-Type: application/json' -d '{"query": {"match_all": {}}, "size": 1}'; done
{"took":17,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3790,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 12% cpu 0.058 total
{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3790,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 23% cpu 0.029 total
{"took":13,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3790,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 16% cpu 0.042 total
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3790,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 25% cpu 0.026 total
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3790,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 26% cpu 0.025 total
❯
❯
❯ for i in {1..5}; do time curl http://localhost:58803/.migrations_working_state/_search -H 'Content-Type: application/json' -d '{"query": {"match_all": {}}, "terminate_after": 1}'; done
{"took":21,"timed_out":false,"terminated_early":true,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.01s system 11% cpu 0.080 total
{"took":3,"timed_out":false,"terminated_early":true,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 29% cpu 0.024 total
{"took":3,"timed_out":false,"terminated_early":true,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 30% cpu 0.024 total
{"took":4,"timed_out":false,"terminated_early":true,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.00s system 23% cpu 0.026 total
{"took":2,"timed_out":false,"terminated_early":true,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":".migrations_working_state","_type":"_doc","_id":"R492","_score":1.0,"_source":{"numAttempts":0,"scriptVersion":"poc","creatorId":"docCreatorWorker","expiration":0}}]}}curl http://localhost:58803/.migrations_working_state/_search -H  -d   0.00s user 0.01s system 31% cpu 0.025 total

}

enum UpdateResult {
Expand Down