From 89198da9608abaadfd20ed6f5956346eb156001b Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 24 Oct 2024 16:52:36 -0600 Subject: [PATCH] Main logic and tests for successor work items Signed-off-by: Mikayla Thompson --- .../tracing/IWorkCoordinationContexts.java | 8 + .../tracing/RootWorkCoordinationContext.java | 13 +- .../tracing/WorkCoordinationContexts.java | 48 +++++ .../workcoordination/IWorkCoordinator.java | 2 + .../OpenSearchWorkCoordinator.java | 127 ++++++++++++-- .../workcoordination/WorkCoordinatorTest.java | 165 +++++++++++++++++- 6 files changed, 348 insertions(+), 15 deletions(-) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java index 3781e60c9..28ef5193b 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/IWorkCoordinationContexts.java @@ -13,6 +13,7 @@ class ActivityNames { public static final String ACQUIRE_SPECIFIC_WORK = "acquireSpecificWorkItem"; public static final String COMPLETE_WORK = "completeWork"; public static final String ACQUIRE_NEXT_WORK = "acquireNextWorkItem"; + public static final String CREATE_SUCCESSOR_WORK_ITEMS = "createSuccessorWorkItems"; private ActivityNames() {} } @@ -76,6 +77,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext { IRefreshContext getRefreshContext(); } + interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext { + String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS; + IRefreshContext getRefreshContext(); + ICompleteWorkItemContext getCompleteWorkItemContext(); + ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext(); + } + interface IScopedWorkContext extends IScopedInstrumentationAttributes { C createOpeningContext(); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java index ee36f5ed5..82d3f6ee2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/RootWorkCoordinationContext.java @@ -18,6 +18,7 @@ public class RootWorkCoordinationContext extends RootOtelContext { public final WorkCoordinationContexts.AcquireSpecificWorkContext.MetricInstruments acquireSpecificWorkMetrics; public final WorkCoordinationContexts.CompleteWorkItemContext.MetricInstruments completeWorkMetrics; public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics; + public final WorkCoordinationContexts.CreateSuccessorWorkItemsContext.MetricInstruments createSuccessorWorkItemsMetrics; public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) { this(sdk, contextTracker, null); @@ -38,6 +39,7 @@ public RootWorkCoordinationContext(OpenTelemetry sdk, acquireSpecificWorkMetrics = WorkCoordinationContexts.AcquireSpecificWorkContext.makeMetrics(meter); completeWorkMetrics = WorkCoordinationContexts.CompleteWorkItemContext.makeMetrics(meter); acquireNextWorkMetrics = WorkCoordinationContexts.AcquireNextWorkItemContext.makeMetrics(meter); + createSuccessorWorkItemsMetrics = WorkCoordinationContexts.CreateSuccessorWorkItemsContext.makeMetrics(meter); } public IWorkCoordinationContexts.IInitializeCoordinatorStateContext createCoordinationInitializationStateContext() { @@ -87,4 +89,13 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createCompleteWorkCont ) { return new WorkCoordinationContexts.CompleteWorkItemContext(this, enclosingScope); } -} + + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return createSuccessorWorkItemsContext(null); + } + + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext( + IScopedInstrumentationAttributes enclosingScope + ) { + return new WorkCoordinationContexts.CreateSuccessorWorkItemsContext(this, enclosingScope); + }} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java index ff0a26362..58a4aa5f7 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/tracing/WorkCoordinationContexts.java @@ -363,4 +363,52 @@ public MetricInstruments getRetryMetrics() { return getRootInstrumentationScope().completeWorkMetrics; } } + + @Getter + class CreateSuccessorWorkItemsContext extends BaseSpanContext + implements + ICreateSuccessorWorkItemsContext, + RetryableActivityContextMetricMixin { + final IScopedInstrumentationAttributes enclosingScope; + + CreateSuccessorWorkItemsContext( + RootWorkCoordinationContext rootScope, + IScopedInstrumentationAttributes enclosingScope + ) { + super(rootScope); + this.enclosingScope = enclosingScope; + initializeSpan(rootScope); + } + + @Override + public String getActivityName() { + return ACTIVITY_NAME; + } + + @Override + public IRefreshContext getRefreshContext() { + return new Refresh(this.rootInstrumentationScope, this); + } + + @Override + public ICompleteWorkItemContext getCompleteWorkItemContext() { return new CompleteWorkItemContext(this.rootInstrumentationScope, this); } + + @Override + public ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext() { return new CreateUnassignedWorkItemContext(this.rootInstrumentationScope, this); } + + public static class MetricInstruments extends RetryMetricInstruments { + private MetricInstruments(Meter meter, String activityName) { + super(meter, autoLabels(activityName)); + } + } + + public static @NonNull MetricInstruments makeMetrics(Meter meter) { + return new MetricInstruments(meter, ACTIVITY_NAME); + } + + @Override + public MetricInstruments getRetryMetrics() { + return getRootInstrumentationScope().createSuccessorWorkItemsMetrics; + } + } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index 595fdae20..acaa7d7ed 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -4,6 +4,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.function.Supplier; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; @@ -170,6 +171,7 @@ class LeaseLockHeldElsewhereException extends RuntimeException {} class WorkItemAndDuration implements WorkAcquisitionOutcome { final String workItemId; final Instant leaseExpirationTime; + final ArrayList successorWorkItems; @Override public T visit(WorkAcquisitionOutcomeVisitor v) throws IOException, InterruptedException { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 4f33b9d73..2f1d5176c 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -5,6 +5,9 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.function.BiPredicate; import java.util.function.Function; @@ -37,7 +40,8 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String CLIENT_TIMESTAMP_TEMPLATE = "{CLIENT_TIMESTAMP}"; public static final String EXPIRATION_WINDOW_TEMPLATE = "{EXPIRATION_WINDOW}"; public static final String CLOCK_DEVIATION_SECONDS_THRESHOLD_TEMPLATE = "{CLOCK_DEVIATION_SECONDS_THRESHOLD}"; - public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "OLD_EXPIRATION_THRESHOLD"; + public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "{OLD_EXPIRATION_THRESHOLD}"; + public static final String SUCCESSOR_WORK_ITEM_IDS_TEMPLATE = "{SUCCESSOR_WORK_ITEM_IDS}"; public static final String RESULT_OPENSSEARCH_FIELD_NAME = "result"; public static final String EXPIRATION_FIELD_NAME = "expiration"; @@ -46,6 +50,7 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String VERSION_CONFLICTS_FIELD_NAME = "version_conflicts"; public static final String COMPLETED_AT_FIELD_NAME = "completedAt"; public static final String SOURCE_FIELD_NAME = "_source"; + public static final String SUCCESSOR_ITEMS_FIELD_NAME = "successor_items"; public static final String QUERY_INCOMPLETE_EXPIRED_ITEMS_STR = " \"query\": {\n" + " \"bool\": {" @@ -138,6 +143,10 @@ public void setup(Supplier 0 && " - + // don't obtain a lease lock - " ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" - + // already done - " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + + " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {" + + // already done + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && " + " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {" + // count as an update to force the caller to lookup the expiration time, but no need to modify it - " ctx.op = \\\"update\\\";" + " ctx.op = \\\"update\\\";" + " } else if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired " ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check " ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" @@ -321,7 +328,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( var resultFromUpdate = getResult(updateResponse); if (resultFromUpdate == DocumentModificationResult.CREATED) { - return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration)); + return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>()); // todo } else { final var httpResponse = httpClient.makeJsonRequest( AbstractedHttpClient.GET_METHOD, @@ -333,7 +340,8 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( if (resultFromUpdate == DocumentModificationResult.UPDATED) { return new WorkItemAndDuration( workItemId, - Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()) + Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()), + new ArrayList() // todo ); } else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) { return new AlreadyCompleted(); @@ -596,7 +604,9 @@ private WorkItemAndDuration getAssignedWorkItemUnsafe() "Expiration wasn't found or wasn't set to > 0 for response:" + response.toDiagnosticString()).log(); throw new MalformedAssignedWorkDocumentException(response); } - var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration)); + + var successorItems = resultHitInner.has(SUCCESSOR_ITEMS_FIELD_NAME) ? new ArrayList<>(Arrays.asList(resultHitInner.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))) : new ArrayList(); + var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration), successorItems); log.atInfo().setMessage(() -> "Returning work item and lease: " + rval).log(); return rval; } @@ -640,6 +650,103 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker, } } + private void updateWorkItemWithSuccessors(String workItemId, ArrayList successorWorkItemIds) throws IOException { + final var updateSuccessorWorkItemsTemplate = "{\n" + + " \"script\": {\n" + + " \"lang\": \"painless\",\n" + + " \"params\": { \n" + + " \"clientTimestamp\": " + CLIENT_TIMESTAMP_TEMPLATE + ",\n" + + " \"workerId\": \"" + WORKER_ID_TEMPLATE + "\",\n" + + " \"successorWorkItems\": \"" + SUCCESSOR_WORK_ITEM_IDS_TEMPLATE + "\"\n" + + " },\n" + + " \"source\": \"" + + " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {" + + " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" + + " } " + + " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {" + + " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source." + + LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);" + + " } else {" + + " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;" + + " }" + + "\"\n" + + " }\n" + + "}"; + + var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc") + .replace(WORKER_ID_TEMPLATE, workerId) + .replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000)) + .replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(",", successorWorkItemIds)); + + var response = httpClient.makeJsonRequest( + AbstractedHttpClient.POST_METHOD, + INDEX_NAME + "/_update/" + workItemId, + null, + body + ); + if (DocumentModificationResult.UPDATED != getResult(response)) { + throw new IllegalStateException( + "Unexpected response for workItemId: " + + workItemId + + ". Response: " + + response.toDiagnosticString() + ); + } + } + + // This is a idempotent function to create multiple unassigned work items. It uses the `create` function in the bulk API + // which creates a document only if the specified ID doesn't yet exist. + public void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds) throws IOException, InterruptedException, ResponseException { + String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + + "\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }"; + String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId); + + StringBuilder body = new StringBuilder(); + for (var workItemId : workItemIds) { + body.append("{\"create\":{\"_id\":\"").append(workItemId).append("\"}}\n"); + body.append(workItemBody).append("\n"); + } + var response = httpClient.makeJsonRequest( + AbstractedHttpClient.POST_METHOD, + INDEX_NAME + "/_bulk", + null, + body.toString() + ); + var statusCode = response.getStatusCode(); + if (statusCode != 200) { + throw new ResponseException(response); // todo + } + // parse the response body and if any of the writes failed with anything EXCEPT a version conflict, throw an exception + var resultTree = objectMapper.readTree(response.getPayloadBytes()); + var errors = resultTree.path("errors").asBoolean(); + if (!errors) { + return; + } + var acceptableErrorCodes = List.of(201, 409); + + var createResults = new ArrayList(); + resultTree.path("items").elements().forEachRemaining( + item -> createResults.add(acceptableErrorCodes.contains(item.path("create").path("status").asInt())) + ); + if (createResults.contains(false)) { + throw new ResponseException(response); + } + + } + + public void createSuccessorWorkItemsAndMarkComplete( + String workItemId, + ArrayList successorWorkItemIds, + Supplier contextSupplier + ) throws IOException, InterruptedException, ResponseException { + try (var ctx = contextSupplier.get()) { + updateWorkItemWithSuccessors(workItemId, successorWorkItemIds); + createUnassignedWorkItemsIfNonexistent(successorWorkItemIds); + completeWorkItem(workItemId, ctx::getCompleteWorkItemContext); + refresh(ctx::getRefreshContext); + } + } + @AllArgsConstructor private static class MaxTriesExceededException extends Exception { final transient Object suppliedValue; diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java index 83263f4ed..147483f6c 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/workcoordination/WorkCoordinatorTest.java @@ -156,7 +156,7 @@ public void testAcquireLeaseForQuery() throws Exception { Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); try ( - var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "firstPass_NONE") + var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "NONE") ) { var nextWorkItem = workCoordinator.acquireNextWorkItem( Duration.ofSeconds(2), @@ -184,12 +184,169 @@ public void testAcquireLeaseForQuery() throws Exception { InMemoryInstrumentationBundle.getMetricValueOrZero(metrics, "acquireNextWorkItemRetries")); } + @Test + public void testAddSuccessorWorkItems() throws Exception { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } + + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "claimItemWorker")) { + for (var i = 0; i < NUM_DOCS; ++i) { + String workItemId = getWorkItemAndVerify(testContext, "claimItemWorker", new ConcurrentHashMap<>(), Duration.ofSeconds(5), false, false); + var currentNumPendingItems = workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext); + ArrayList successorWorkItems = new ArrayList<>(); + for (int j = 0; j < NUM_SUCCESSOR_ITEMS; j++) { + successorWorkItems.add(workItemId + "_successor_" + j); + } + workCoordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, successorWorkItems, + testContext::createSuccessorWorkItemsContext + ); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // One item marked as completed, and NUM_SUCCESSOR_ITEMS created. + Assertions.assertEquals(currentNumPendingItems - 1 + NUM_SUCCESSOR_ITEMS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + + Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + } + + @Test + public void testAddSuccessorWorkItemsSimultaneous() throws Exception { + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + final var NUM_DOCS = 20; + final var NUM_SUCCESSOR_ITEMS = 3; + var executorService = Executors.newFixedThreadPool(NUM_DOCS); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "docCreatorWorker")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + for (var i = 0; i < NUM_DOCS; ++i) { + final var docId = "R" + i; + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + } + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + } + + final var seenWorkerItems = new ConcurrentHashMap(); + var allFutures = new ArrayList>(); + final var expiration = Duration.ofSeconds(5); + for (int i = 0; i < NUM_DOCS; ++i) { + int finalI = i; + allFutures.add( + CompletableFuture.supplyAsync( + () -> getWorkItemAndCompleteWithSuccessors(testContext, "successor_test_" + finalI, seenWorkerItems, expiration, true, NUM_SUCCESSOR_ITEMS), + executorService + ) + ); + } + CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).join(); + Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "checkResults")) { + Assertions.assertEquals(NUM_SUCCESSOR_ITEMS * NUM_DOCS, workCoordinator.numWorkItemsNotYetComplete(testContext::createItemsPendingContext)); + } + } + + @Test + public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception { + // A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created + // but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items. + var testContext = WorkCoordinationTestContext.factory().withAllTracking(); + var docId = "R0"; + var N_SUCCESSOR_ITEMS = 3; + var successorItems = new ArrayList(); + for (int i = 0; i < N_SUCCESSOR_ITEMS; i++) { + successorItems.add(docId + "_successor_" + i); + } + + var originalWorkItemExpiration = Duration.ofSeconds(5); + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) { + Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext); + Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext)); + // Claim the work item + getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); + var client = httpClientSupplier.get(); + // Add tbe list of successors to the work item + var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}"; + var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body); + var responseBody = (new ObjectMapper()).readTree(response.getPayloadBytes()); + Assertions.assertEquals(200, response.getStatusCode()); + // Create a successor item and then claim it with a long lease. + workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext); + // Now, we should be able to claim the first successor item with a different worker id + // We should NOT be able to claim the other successor items yet (since they haven't been created yet) or the original item + String workItemId = getWorkItemAndVerify(testContext, "claimSuccessorItem", new ConcurrentHashMap<>(), Duration.ofSeconds(60), false, false); + Assertions.assertEquals(successorItems.get(0), workItemId); // We need to ensure that the item we just claimed is the expected one. + + // Sleep for the remainder of the original work item's lease so that it becomes available. + Thread.sleep(originalWorkItemExpiration.toMillis() + 1000); + + // Okay, we're now in a state where the only document available is the original, incomplete one. + // We need to make sure that if we pick it up and call `createSuccessorWorkItemsAndMarkComplete`, it will complete successfully and create the two missing items. + var originalWorkItemId = getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); + Assertions.assertEquals(docId, originalWorkItemId); + workCoordinator.createSuccessorWorkItemsAndMarkComplete(originalWorkItemId, successorItems, testContext::createSuccessorWorkItemsContext); + + // Now, we should be able to claim the other successor items but the _next_ call should fail because there are no available items + for (int i = 0; i < (N_SUCCESSOR_ITEMS - 1); i++) { + workItemId = getWorkItemAndVerify(testContext, "claimItem_" + i, new ConcurrentHashMap<>(), originalWorkItemExpiration, false, true); + Assertions.assertTrue(successorItems.contains(workItemId)); + } + + Assertions.assertThrows(IllegalStateException.class, () -> { + getWorkItemAndVerify(testContext, "finalClaimItem", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false); + }); + } + } + + // Create a test where a work item tries to create itself as a successor -- it should fail and NOT be marked as complete. Another worker should pick it up and double the lease time. + + @SneakyThrows + private String getWorkItemAndCompleteWithSuccessors( + WorkCoordinationTestContext testContext, + String workerName, + ConcurrentHashMap seenWorkerItems, + Duration expirationWindow, + boolean placeFinishedDoc, + int numSuccessorItems + ) { + var workItemId = getWorkItemAndVerify( + testContext, + workerName, + seenWorkerItems, + expirationWindow, + placeFinishedDoc, + false + ); + ArrayList successorWorkItems = new ArrayList<>(); + for (int j = 0; j < numSuccessorItems; j++) { + successorWorkItems.add(workItemId + "_successor_" + j); + } + try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, workerName)) { + workCoordinator.createSuccessorWorkItemsAndMarkComplete( + workItemId, successorWorkItems, + testContext::createSuccessorWorkItemsContext + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + return workItemId; + } + + static AtomicInteger nonce = new AtomicInteger(); @SneakyThrows private String getWorkItemAndVerify( WorkCoordinationTestContext testContext, - String workerSuffix, + String workerName, ConcurrentHashMap seenWorkerItems, Duration expirationWindow, boolean placeFinishedDoc, @@ -198,8 +355,7 @@ private String getWorkItemAndVerify( try ( var workCoordinator = new OpenSearchWorkCoordinator( httpClientSupplier.get(), - 3600, - "firstPass_" + workerSuffix + 3600, workerName ) ) { var doneId = DUMMY_FINISHED_DOC_ID + "_" + nonce.incrementAndGet(); @@ -251,4 +407,5 @@ public String onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) thro throw e; } } + }