Skip to content

Commit

Permalink
Integrate into workflow (anytime a worker picks up a work item with s…
Browse files Browse the repository at this point in the history
…uccessor items, it should attempt to redrive the successor item process)

Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed Oct 25, 2024
1 parent 9af9db4 commit 10b0ede
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
Duration leaseDuration,
Supplier<IWorkCoordinationContexts.IAcquireSpecificWorkContext> contextSupplier
) throws IOException;

/**
* Scan the created work items that have not yet had leases acquired and have not yet finished.
* One of those work items will be returned along with a lease for how long this process may continue
Expand Down Expand Up @@ -99,6 +99,20 @@ void completeWorkItem(
Supplier<IWorkCoordinationContexts.ICompleteWorkItemContext> contextSupplier
) throws IOException, InterruptedException;

/**
* Add the list of successor items to the work item, create new work items for each of the successors, and mark the
* original work item as completed.
* @param workItemId the work item that is being completed
* @param successorWorkItemIds the list of successor work items that will be created
* @throws IOException
* @throws InterruptedException
*/
void createSuccessorWorkItemsAndMarkComplete(
String workItemId,
ArrayList<String> successorWorkItemIds,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) throws IOException, InterruptedException;

/**
* @return the number of items that are not yet complete. This will include items with and without claimed leases.
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -315,6 +316,13 @@ public boolean createUnassignedWorkItem(
}
}

private ArrayList<String> getSuccessorItemsIfPresent(JsonNode responseDoc) {
if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) {
return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(",")));
}
return new ArrayList<>();
}

@Override
@NonNull
public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
Expand All @@ -328,7 +336,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
var resultFromUpdate = getResult(updateResponse);

if (resultFromUpdate == DocumentModificationResult.CREATED) {
return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>()); // todo
return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>());
} else {
final var httpResponse = httpClient.makeJsonRequest(
AbstractedHttpClient.GET_METHOD,
Expand All @@ -341,7 +349,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
return new WorkItemAndDuration(
workItemId,
Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()),
new ArrayList<String>() // todo
getSuccessorItemsIfPresent(responseDoc)
);
} else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) {
return new AlreadyCompleted();
Expand Down Expand Up @@ -605,7 +613,7 @@ private WorkItemAndDuration getAssignedWorkItemUnsafe()
throw new MalformedAssignedWorkDocumentException(response);
}

var successorItems = resultHitInner.has(SUCCESSOR_ITEMS_FIELD_NAME) ? new ArrayList<>(Arrays.asList(resultHitInner.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))) : new ArrayList<String>();
var successorItems = getSuccessorItemsIfPresent(resultHitInner);
var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration), successorItems);
log.atInfo().setMessage(() -> "Returning work item and lease: " + rval).log();
return rval;
Expand Down Expand Up @@ -667,7 +675,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList<String> s
+ " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source."
+ LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);"
+ " }"
+ " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source" + SUCCESSOR_ITEMS_FIELD_NAME + "!= params.successorWorkItems) {"
+ " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != params.successorWorkItems) {"
+ " throw new IllegalArgumentException(\\\"The " + SUCCESSOR_ITEMS_FIELD_NAME + " field cannot be updated with a different value.\\\")"
+ " }"
+ " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;"
Expand Down Expand Up @@ -698,7 +706,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList<String> s

// This is a idempotent function to create multiple unassigned work items. It uses the `create` function in the bulk API
// which creates a document only if the specified ID doesn't yet exist.
public void createUnassignedWorkItemsIfNonexistent(ArrayList<String> workItemIds) throws IOException, InterruptedException, ResponseException {
private void createUnassignedWorkItemsIfNonexistent(ArrayList<String> workItemIds) throws IOException, IllegalStateException {
String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " +
"\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }";
String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId);
Expand All @@ -716,31 +724,43 @@ public void createUnassignedWorkItemsIfNonexistent(ArrayList<String> workItemIds
);
var statusCode = response.getStatusCode();
if (statusCode != 200) {
throw new ResponseException(response); // todo
throw new IllegalStateException(
"A bulk request to create successor work item(s), "
+ String.join(", ", workItemIds)
+ "returned an unexpected status code "
+ statusCode
+ " instead of 200"
);
}
// parse the response body and if any of the writes failed with anything EXCEPT a version conflict, throw an exception
var resultTree = objectMapper.readTree(response.getPayloadBytes());
var errors = resultTree.path("errors").asBoolean();
if (!errors) {
return;
}
var acceptableErrorCodes = List.of(201, 409);
var acceptableStatusCodes = List.of(201, 409);

var createResults = new ArrayList<Boolean>();
resultTree.path("items").elements().forEachRemaining(
item -> createResults.add(acceptableErrorCodes.contains(item.path("create").path("status").asInt()))
item -> createResults.add(acceptableStatusCodes.contains(item.path("create").path("status").asInt()))
);
if (createResults.contains(false)) {
throw new ResponseException(response);
throw new IllegalStateException(
"One or more of the successor work item(s) could not be created: "
+ String.join(", ", workItemIds)
+ ". Response: "
+ response.toDiagnosticString()
);
}

}

@Override
public void createSuccessorWorkItemsAndMarkComplete(
String workItemId,
ArrayList<String> successorWorkItemIds,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) throws IOException, InterruptedException, ResponseException {
) throws IOException, InterruptedException, IllegalStateException {
try (var ctx = contextSupplier.get()) {
updateWorkItemWithSuccessors(workItemId, successorWorkItemIds);
createUnassignedWorkItemsIfNonexistent(successorWorkItemIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;

@Slf4j
public class ScopedWorkCoordinator {
Expand All @@ -27,7 +28,8 @@ public interface WorkItemGetter {
public <T> T ensurePhaseCompletion(
WorkItemGetter workItemIdSupplier,
IWorkCoordinator.WorkAcquisitionOutcomeVisitor<T> visitor,
Supplier<IWorkCoordinationContexts.ICompleteWorkItemContext> contextSupplier
Supplier<IWorkCoordinationContexts.ICompleteWorkItemContext> contextSupplier,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> successorContextSupplier
) throws IOException, InterruptedException {
var acquisitionResult = workItemIdSupplier.tryAcquire(workCoordinator);
return acquisitionResult.visit(new IWorkCoordinator.WorkAcquisitionOutcomeVisitor<T>() {
Expand All @@ -46,6 +48,11 @@ public T onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) throws IO
InterruptedException {
var workItemId = workItem.getWorkItemId();
leaseExpireTrigger.registerExpiration(workItem.workItemId, workItem.leaseExpirationTime);
if (!workItem.successorWorkItems.isEmpty()) {
workCoordinator.createSuccessorWorkItemsAndMarkComplete(workItemId, workItem.successorWorkItems, successorContextSupplier);
leaseExpireTrigger.markWorkAsCompleted(workItemId);
return visitor.onAlreadyCompleted();
}
var rval = visitor.onAcquiredWork(workItem);
workCoordinator.completeWorkItem(workItemId, contextSupplier);
leaseExpireTrigger.markWorkAsCompleted(workItemId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CompletionStatus onAcquiredWork(IWorkCoordinator.WorkItemAndDuration work
public CompletionStatus onNoAvailableWorkToBeDone() throws IOException {
return CompletionStatus.NOTHING_DONE;
}
}, context::createCloseContet);
}, context::createCloseContet, context::createSuccessorWorkItemsContext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Void onAcquiredWork(IWorkCoordinator.WorkItemAndDuration workItem) {
public Void onNoAvailableWorkToBeDone() throws IOException {
return null;
}
}, context::createWorkCompletionContext);
}, context::createWorkCompletionContext, context::createSuccessorWorkItemsContext);
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionCo
public IAddShardWorkItemContext createShardWorkItemContext() {
return new AddShardWorkItemContext(rootInstrumentationScope, this);
}

@Override
public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() {
return getWorkCoordinationRootContext().createSuccessorWorkItemsContext(getEnclosingScope());
}
}

class AddShardWorkItemContext extends BaseNestedSpanContext<
Expand Down Expand Up @@ -175,5 +180,10 @@ public IWorkCoordinationContexts.IAcquireNextWorkItemContext createOpeningContex
public IWorkCoordinationContexts.ICompleteWorkItemContext createCloseContet() {
return getWorkCoordinationRootContext().createCompleteWorkContext();
}

@Override
public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() {
return getWorkCoordinationRootContext().createSuccessorWorkItemsContext();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ interface IShardSetupAttemptContext extends IScopedInstrumentationAttributes {
IWorkCoordinationContexts.ICompleteWorkItemContext createWorkCompletionContext();

IAddShardWorkItemContext createShardWorkItemContext();

IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext();
}

interface IAddShardWorkItemContext extends IScopedInstrumentationAttributes {
Expand All @@ -42,5 +44,8 @@ interface IDocumentReindexContext
IRfsContexts.IRequestContext createBulkRequest();

IRfsContexts.IRequestContext createRefreshContext();

IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext();

}
}

0 comments on commit 10b0ede

Please sign in to comment.