Skip to content

Commit

Permalink
Main logic and tests for successor work items
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed Oct 24, 2024
1 parent f573157 commit 89198da
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ActivityNames {
public static final String ACQUIRE_SPECIFIC_WORK = "acquireSpecificWorkItem";
public static final String COMPLETE_WORK = "completeWork";
public static final String ACQUIRE_NEXT_WORK = "acquireNextWorkItem";
public static final String CREATE_SUCCESSOR_WORK_ITEMS = "createSuccessorWorkItems";

private ActivityNames() {}
}
Expand Down Expand Up @@ -76,6 +77,13 @@ interface ICompleteWorkItemContext extends IRetryableActivityContext {
IRefreshContext getRefreshContext();
}

interface ICreateSuccessorWorkItemsContext extends IRetryableActivityContext {
String ACTIVITY_NAME = ActivityNames.CREATE_SUCCESSOR_WORK_ITEMS;
IRefreshContext getRefreshContext();
ICompleteWorkItemContext getCompleteWorkItemContext();
ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext();
}

interface IScopedWorkContext<C extends IBaseAcquireWorkContext> extends IScopedInstrumentationAttributes {
C createOpeningContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class RootWorkCoordinationContext extends RootOtelContext {
public final WorkCoordinationContexts.AcquireSpecificWorkContext.MetricInstruments acquireSpecificWorkMetrics;
public final WorkCoordinationContexts.CompleteWorkItemContext.MetricInstruments completeWorkMetrics;
public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics;
public final WorkCoordinationContexts.CreateSuccessorWorkItemsContext.MetricInstruments createSuccessorWorkItemsMetrics;

public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) {
this(sdk, contextTracker, null);
Expand All @@ -38,6 +39,7 @@ public RootWorkCoordinationContext(OpenTelemetry sdk,
acquireSpecificWorkMetrics = WorkCoordinationContexts.AcquireSpecificWorkContext.makeMetrics(meter);
completeWorkMetrics = WorkCoordinationContexts.CompleteWorkItemContext.makeMetrics(meter);
acquireNextWorkMetrics = WorkCoordinationContexts.AcquireNextWorkItemContext.makeMetrics(meter);
createSuccessorWorkItemsMetrics = WorkCoordinationContexts.CreateSuccessorWorkItemsContext.makeMetrics(meter);
}

public IWorkCoordinationContexts.IInitializeCoordinatorStateContext createCoordinationInitializationStateContext() {
Expand Down Expand Up @@ -87,4 +89,13 @@ public IWorkCoordinationContexts.ICompleteWorkItemContext createCompleteWorkCont
) {
return new WorkCoordinationContexts.CompleteWorkItemContext(this, enclosingScope);
}
}

public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext() {
return createSuccessorWorkItemsContext(null);
}

public IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext createSuccessorWorkItemsContext(
IScopedInstrumentationAttributes enclosingScope
) {
return new WorkCoordinationContexts.CreateSuccessorWorkItemsContext(this, enclosingScope);
}}
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,52 @@ public MetricInstruments getRetryMetrics() {
return getRootInstrumentationScope().completeWorkMetrics;
}
}

@Getter
class CreateSuccessorWorkItemsContext extends BaseSpanContext<RootWorkCoordinationContext>
implements
ICreateSuccessorWorkItemsContext,
RetryableActivityContextMetricMixin<CreateSuccessorWorkItemsContext.MetricInstruments> {
final IScopedInstrumentationAttributes enclosingScope;

CreateSuccessorWorkItemsContext(
RootWorkCoordinationContext rootScope,
IScopedInstrumentationAttributes enclosingScope
) {
super(rootScope);
this.enclosingScope = enclosingScope;
initializeSpan(rootScope);
}

@Override
public String getActivityName() {
return ACTIVITY_NAME;
}

@Override
public IRefreshContext getRefreshContext() {
return new Refresh(this.rootInstrumentationScope, this);
}

@Override
public ICompleteWorkItemContext getCompleteWorkItemContext() { return new CompleteWorkItemContext(this.rootInstrumentationScope, this); }

@Override
public ICreateUnassignedWorkItemContext getCreateUnassignedWorkItemContext() { return new CreateUnassignedWorkItemContext(this.rootInstrumentationScope, this); }

public static class MetricInstruments extends RetryMetricInstruments {
private MetricInstruments(Meter meter, String activityName) {
super(meter, autoLabels(activityName));
}
}

public static @NonNull MetricInstruments makeMetrics(Meter meter) {
return new MetricInstruments(meter, ACTIVITY_NAME);
}

@Override
public MetricInstruments getRetryMetrics() {
return getRootInstrumentationScope().createSuccessorWorkItemsMetrics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.function.Supplier;

import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;
Expand Down Expand Up @@ -170,6 +171,7 @@ class LeaseLockHeldElsewhereException extends RuntimeException {}
class WorkItemAndDuration implements WorkAcquisitionOutcome {
final String workItemId;
final Instant leaseExpirationTime;
final ArrayList<String> successorWorkItems;

@Override
public <T> T visit(WorkAcquisitionOutcomeVisitor<T> v) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Function;
Expand Down Expand Up @@ -37,7 +40,8 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String CLIENT_TIMESTAMP_TEMPLATE = "{CLIENT_TIMESTAMP}";
public static final String EXPIRATION_WINDOW_TEMPLATE = "{EXPIRATION_WINDOW}";
public static final String CLOCK_DEVIATION_SECONDS_THRESHOLD_TEMPLATE = "{CLOCK_DEVIATION_SECONDS_THRESHOLD}";
public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "OLD_EXPIRATION_THRESHOLD";
public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "{OLD_EXPIRATION_THRESHOLD}";
public static final String SUCCESSOR_WORK_ITEM_IDS_TEMPLATE = "{SUCCESSOR_WORK_ITEM_IDS}";

public static final String RESULT_OPENSSEARCH_FIELD_NAME = "result";
public static final String EXPIRATION_FIELD_NAME = "expiration";
Expand All @@ -46,6 +50,7 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String VERSION_CONFLICTS_FIELD_NAME = "version_conflicts";
public static final String COMPLETED_AT_FIELD_NAME = "completedAt";
public static final String SOURCE_FIELD_NAME = "_source";
public static final String SUCCESSOR_ITEMS_FIELD_NAME = "successor_items";

public static final String QUERY_INCOMPLETE_EXPIRED_ITEMS_STR = " \"query\": {\n"
+ " \"bool\": {"
Expand Down Expand Up @@ -138,6 +143,10 @@ public void setup(Supplier<IWorkCoordinationContexts.IInitializeCoordinatorState
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
Expand Down Expand Up @@ -227,14 +236,12 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument(
+ " throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");"
+ " }"
+ " long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);"
+ " if (params.expirationWindow > 0 && "
+ // don't obtain a lease lock
" ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {"
+ // already done
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && "
+ " if (params.expirationWindow > 0 && ctx._source." + COMPLETED_AT_FIELD_NAME + " == null) {"
+ // already done
" if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " == params.workerId && "
+ " ctx._source." + EXPIRATION_FIELD_NAME + " > serverTimeSeconds) {"
+ // count as an update to force the caller to lookup the expiration time, but no need to modify it
" ctx.op = \\\"update\\\";"
" ctx.op = \\\"update\\\";"
+ " } else if (ctx._source." + EXPIRATION_FIELD_NAME + " < serverTimeSeconds && " + // is expired
" ctx._source." + EXPIRATION_FIELD_NAME + " < newExpiration) {" + // sanity check
" ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;"
Expand Down Expand Up @@ -321,7 +328,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
var resultFromUpdate = getResult(updateResponse);

if (resultFromUpdate == DocumentModificationResult.CREATED) {
return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration));
return new WorkItemAndDuration(workItemId, startTime.plus(leaseDuration), new ArrayList<>()); // todo
} else {
final var httpResponse = httpClient.makeJsonRequest(
AbstractedHttpClient.GET_METHOD,
Expand All @@ -333,7 +340,8 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
if (resultFromUpdate == DocumentModificationResult.UPDATED) {
return new WorkItemAndDuration(
workItemId,
Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue())
Instant.ofEpochMilli(1000 * responseDoc.path(EXPIRATION_FIELD_NAME).longValue()),
new ArrayList<String>() // todo
);
} else if (!responseDoc.path(COMPLETED_AT_FIELD_NAME).isMissingNode()) {
return new AlreadyCompleted();
Expand Down Expand Up @@ -596,7 +604,9 @@ private WorkItemAndDuration getAssignedWorkItemUnsafe()
"Expiration wasn't found or wasn't set to > 0 for response:" + response.toDiagnosticString()).log();
throw new MalformedAssignedWorkDocumentException(response);
}
var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration));

var successorItems = resultHitInner.has(SUCCESSOR_ITEMS_FIELD_NAME) ? new ArrayList<>(Arrays.asList(resultHitInner.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(","))) : new ArrayList<String>();
var rval = new WorkItemAndDuration(resultHitInner.get("_id").asText(), Instant.ofEpochMilli(1000 * expiration), successorItems);
log.atInfo().setMessage(() -> "Returning work item and lease: " + rval).log();
return rval;
}
Expand Down Expand Up @@ -640,6 +650,103 @@ private WorkItemAndDuration getAssignedWorkItem(LeaseChecker leaseChecker,
}
}

private void updateWorkItemWithSuccessors(String workItemId, ArrayList<String> successorWorkItemIds) throws IOException {
final var updateSuccessorWorkItemsTemplate = "{\n"
+ " \"script\": {\n"
+ " \"lang\": \"painless\",\n"
+ " \"params\": { \n"
+ " \"clientTimestamp\": " + CLIENT_TIMESTAMP_TEMPLATE + ",\n"
+ " \"workerId\": \"" + WORKER_ID_TEMPLATE + "\",\n"
+ " \"successorWorkItems\": \"" + SUCCESSOR_WORK_ITEM_IDS_TEMPLATE + "\"\n"
+ " },\n"
+ " \"source\": \""
+ " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {"
+ " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);"
+ " } "
+ " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {"
+ " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source."
+ LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);"
+ " } else {"
+ " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;"
+ " }"
+ "\"\n"
+ " }\n"
+ "}";

var body = updateSuccessorWorkItemsTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc")
.replace(WORKER_ID_TEMPLATE, workerId)
.replace(CLIENT_TIMESTAMP_TEMPLATE, Long.toString(clock.instant().toEpochMilli() / 1000))
.replace(SUCCESSOR_WORK_ITEM_IDS_TEMPLATE, String.join(",", successorWorkItemIds));

var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
null,
body
);
if (DocumentModificationResult.UPDATED != getResult(response)) {
throw new IllegalStateException(
"Unexpected response for workItemId: "
+ workItemId
+ ". Response: "
+ response.toDiagnosticString()
);
}
}

// This is a idempotent function to create multiple unassigned work items. It uses the `create` function in the bulk API
// which creates a document only if the specified ID doesn't yet exist.
public void createUnassignedWorkItemsIfNonexistent(ArrayList<String> workItemIds) throws IOException, InterruptedException, ResponseException {
String workItemBodyTemplate = "{\"numAttempts\":0, \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " +
"\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }";
String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "poc").replace(WORKER_ID_TEMPLATE, workerId);

StringBuilder body = new StringBuilder();
for (var workItemId : workItemIds) {
body.append("{\"create\":{\"_id\":\"").append(workItemId).append("\"}}\n");
body.append(workItemBody).append("\n");
}
var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_bulk",
null,
body.toString()
);
var statusCode = response.getStatusCode();
if (statusCode != 200) {
throw new ResponseException(response); // todo
}
// parse the response body and if any of the writes failed with anything EXCEPT a version conflict, throw an exception
var resultTree = objectMapper.readTree(response.getPayloadBytes());
var errors = resultTree.path("errors").asBoolean();
if (!errors) {
return;
}
var acceptableErrorCodes = List.of(201, 409);

var createResults = new ArrayList<Boolean>();
resultTree.path("items").elements().forEachRemaining(
item -> createResults.add(acceptableErrorCodes.contains(item.path("create").path("status").asInt()))
);
if (createResults.contains(false)) {
throw new ResponseException(response);
}

}

public void createSuccessorWorkItemsAndMarkComplete(
String workItemId,
ArrayList<String> successorWorkItemIds,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) throws IOException, InterruptedException, ResponseException {
try (var ctx = contextSupplier.get()) {
updateWorkItemWithSuccessors(workItemId, successorWorkItemIds);
createUnassignedWorkItemsIfNonexistent(successorWorkItemIds);
completeWorkItem(workItemId, ctx::getCompleteWorkItemContext);
refresh(ctx::getRefreshContext);
}
}

@AllArgsConstructor
private static class MaxTriesExceededException extends Exception {
final transient Object suppliedValue;
Expand Down
Loading

0 comments on commit 89198da

Please sign in to comment.