diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index acaa7d7ed..840010e1d 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -69,7 +69,7 @@ WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( Duration leaseDuration, Supplier contextSupplier ) throws IOException; - + /** * Scan the created work items that have not yet had leases acquired and have not yet finished. * One of those work items will be returned along with a lease for how long this process may continue @@ -99,6 +99,20 @@ void completeWorkItem( Supplier contextSupplier ) throws IOException, InterruptedException; + /** + * Add the list of successor items to the work item, create new work items for each of the successors, and mark the + * original work item as completed. + * @param workItemId the work item that is being completed + * @param successorWorkItemIds the list of successor work items that will be created + * @throws IOException + * @throws InterruptedException + */ + void createSuccessorWorkItemsAndMarkComplete( + String workItemId, + ArrayList successorWorkItemIds, + Supplier contextSupplier + ) throws IOException, InterruptedException; + /** * @return the number of items that are not yet complete. This will include items with and without claimed leases. * @throws IOException diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 95672d62a..b88413613 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -14,6 +14,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.JsonNode; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; import com.fasterxml.jackson.databind.ObjectMapper; @@ -315,6 +316,13 @@ public boolean createUnassignedWorkItem( } } + private ArrayList getSuccessorItemsIfPresent(JsonNode responseDoc) { + if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) { + return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))); + } + return new ArrayList<>(); + } + @Override @NonNull public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( @@ -328,7 +336,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( var resultFromUpdate = getResult(updateResponse); if (resultFromUpdate == DocumentModificationResult.CREATED) { - return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>()); // todo + return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>()); } else { final var httpResponse = httpClient.makeJsonRequest( AbstractedHttpClient.GET_METHOD, @@ -341,7 +349,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem( return new WorkItemAndDuration( workItemId, Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()), - new ArrayList() // todo + getSuccessorItemsIfPresent(responseDoc) ); } else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) { return new AlreadyCompleted(); @@ -605,7 +613,7 @@ private WorkItemAndDuration getAssignedWorkItemUnsafe() throw new MalformedAssignedWorkDocumentException(response); } - var successorItems = resultHitInner.has(SUCCESSOR_ITEMS_FIELD_NAME) ? new ArrayList<>(Arrays.asList(resultHitInner.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))) : new ArrayList(); + var successorItems = getSuccessorItemsIfPresent(resultHitInner); var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration), successorItems); log.atInfo().setMessage(() -> "Returning work item and lease: " + rval).log(); return rval; @@ -667,7 +675,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s + " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);" + " }" - + " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source" + SUCCESSOR_ITEMS_FIELD_NAME + "!= params.successorWorkItems) {" + + " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != params.successorWorkItems) {" + " throw new IllegalArgumentException(\\\"The " + SUCCESSOR_ITEMS_FIELD_NAME + " field cannot be updated with a different value.\\\")" + " }" + " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;" @@ -698,7 +706,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s // This is a idempotent function to create multiple unassigned work items. It uses the `create` function in the bulk API // which creates a document only if the specified ID doesn't yet exist. - public void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds) throws IOException, InterruptedException, ResponseException { + private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds) throws IOException, IllegalStateException { String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + "\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }"; String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId); @@ -716,7 +724,13 @@ public void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds ); var statusCode = response.getStatusCode(); if (statusCode != 200) { - throw new ResponseException(response); // todo + throw new IllegalStateException( + "A bulk request to create successor work item(s), " + + String.join(", ", workItemIds) + + "returned an unexpected status code " + + statusCode + + " instead of 200" + ); } // parse the response body and if any of the writes failed with anything EXCEPT a version conflict, throw an exception var resultTree = objectMapper.readTree(response.getPayloadBytes()); @@ -724,23 +738,29 @@ public void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds if (!errors) { return; } - var acceptableErrorCodes = List.of(201, 409); + var acceptableStatusCodes = List.of(201, 409); var createResults = new ArrayList(); resultTree.path("items").elements().forEachRemaining( - item -> createResults.add(acceptableErrorCodes.contains(item.path("create").path("status").asInt())) + item -> createResults.add(acceptableStatusCodes.contains(item.path("create").path("status").asInt())) ); if (createResults.contains(false)) { - throw new ResponseException(response); + throw new IllegalStateException( + "One or more of the successor work item(s) could not be created: " + + String.join(", ", workItemIds) + + ". Response: " + + response.toDiagnosticString() + ); } } + @Override public void createSuccessorWorkItemsAndMarkComplete( String workItemId, ArrayList successorWorkItemIds, Supplier contextSupplier - ) throws IOException, InterruptedException, ResponseException { + ) throws IOException, InterruptedException, IllegalStateException { try (var ctx = contextSupplier.get()) { updateWorkItemWithSuccessors(workItemId, successorWorkItemIds); createUnassignedWorkItemsIfNonexistent(successorWorkItemIds); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java index 68f91b42e..e3990a92e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/ScopedWorkCoordinator.java @@ -7,6 +7,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.bulkload.worker.DocumentsRunner; @Slf4j public class ScopedWorkCoordinator { @@ -27,7 +28,8 @@ public interface WorkItemGetter { public T ensurePhaseCompletion( WorkItemGetter workItemIdSupplier, IWorkCoordinator.WorkAcquisitionOutcomeVisitor visitor, - Supplier contextSupplier + Supplier contextSupplier, + Supplier successorContextSupplier ) throws IOException, InterruptedException { var acquisitionResult = workItemIdSupplier.tryAcquire(workCoordinator); return acquisitionResult.visit(new IWorkCoordinator.WorkAcquisitionOutcomeVisitor() { @@ -46,6 +48,11 @@ public T onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IO InterruptedException { var workItemId = workItem.getWorkItemId(); leaseExpireTrigger.registerExpiration(workItem.workItemId, workItem.leaseExpirationTime); + if (!workItem.successorWorkItems.isEmpty()) { + workCoordinator.createSuccessorWorkItemsAndMarkComplete(workItemId, workItem.successorWorkItems, successorContextSupplier); + leaseExpireTrigger.markWorkAsCompleted(workItemId); + return visitor.onAlreadyCompleted(); + } var rval = visitor.onAcquiredWork(workItem); workCoordinator.completeWorkItem(workItemId, contextSupplier); leaseExpireTrigger.markWorkAsCompleted(workItemId); diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java index 958b5d3e6..046a91cb2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java @@ -71,7 +71,7 @@ public CompletionStatus onAcquiredWork(IWorkCoordinator.WorkItemAndDuration work public CompletionStatus onNoAvailableWorkToBeDone() throws IOException { return CompletionStatus.NOTHING_DONE; } - }, context::createCloseContet); + }, context::createCloseContet, context::createSuccessorWorkItemsContext); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java index 7e79418ae..f2377fd2a 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/ShardWorkPreparer.java @@ -85,7 +85,7 @@ public Void onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) { public Void onNoAvailableWorkToBeDone() throws IOException { return null; } - }, context::createWorkCompletionContext); + }, context::createWorkCompletionContext, context::createSuccessorWorkItemsContext); } @SneakyThrows diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java index 2c17bcb42..697834060 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationContexts.java @@ -72,6 +72,11 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionCo public IAddShardWorkItemContext createShardWorkItemContext() { return new AddShardWorkItemContext(rootInstrumentationScope, this); } + + @Override + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return getWorkCoordinationRootContext().createSuccessorWorkItemsContext(getEnclosingScope()); + } } class AddShardWorkItemContext extends BaseNestedSpanContext< @@ -175,5 +180,10 @@ public IWorkCoordinationContexts.IAcquireNextWorkItemContext createOpeningContex public IWorkCoordinationContexts.ICompleteWorkItemContext createCloseContet() { return getWorkCoordinationRootContext().createCompleteWorkContext(); } + + @Override + public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() { + return getWorkCoordinationRootContext().createSuccessorWorkItemsContext(); + } } } diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java index 9a3cb580d..6d7d257ce 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/IDocumentMigrationContexts.java @@ -26,6 +26,8 @@ interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes { IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionContext(); IAddShardWorkItemContext createShardWorkItemContext(); + + IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(); } interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes { @@ -42,5 +44,8 @@ interface IDocumentReindexContext IRfsContexts.IRequestContext createBulkRequest(); IRfsContexts.IRequestContext createRefreshContext(); + + IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(); + } }