Skip to content

Commit

Permalink
Add another check to never change the successor items
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed Oct 24, 2024
1 parent 89198da commit 9af9db4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,15 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList<String> s
+ " \"source\": \""
+ " if (ctx._source.scriptVersion != \\\"" + SCRIPT_VERSION_TEMPLATE + "\\\") {"
+ " throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);"
+ " } "
+ " }"
+ " if (ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " != params.workerId) {"
+ " throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source."
+ LEASE_HOLDER_ID_FIELD_NAME + " + \\\" not \\\" + params.workerId);"
+ " } else {"
+ " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;"
+ " }"
+ " }"
+ " if (ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " != null && ctx._source" + SUCCESSOR_ITEMS_FIELD_NAME + "!= params.successorWorkItems) {"
+ " throw new IllegalArgumentException(\\\"The " + SUCCESSOR_ITEMS_FIELD_NAME + " field cannot be updated with a different value.\\\")"
+ " }"
+ " ctx._source." + SUCCESSOR_ITEMS_FIELD_NAME + " = params.successorWorkItems;"
+ "\"\n"
+ " }\n"
+ "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception {
// Add tbe list of successors to the work item
var body = "{\"doc\": {\"successor_items\": \"" + String.join(",", successorItems) + "\"}}";
var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body);
var responseBody = (new ObjectMapper()).readTree(response.getPayloadBytes());
Assertions.assertEquals(200, response.getStatusCode());
// Create a successor item and then claim it with a long lease.
workCoordinator.createUnassignedWorkItem(successorItems.get(0), testContext::createUnassignedWorkContext);
Expand Down Expand Up @@ -306,6 +305,41 @@ public void testAddSuccessorWorkItemsPartiallyCompleted() throws Exception {
}
}


@Test
public void testAddSuccessorItemsFailsIfAlreadyDifferentSuccessorItems() throws Exception {
// A partially completed successor item will have a `successor_items` field and _some_ of the successor work items will be created
// but not all. This tests that the coordinator handles this case correctly by continuing to make the originally specific successor items.
var testContext = WorkCoordinationTestContext.factory().withAllTracking();
var docId = "R0";
var N_SUCCESSOR_ITEMS = 3;
var successorItems = new ArrayList<String>();
for (int i = 0; i < N_SUCCESSOR_ITEMS; i++) {
successorItems.add(docId + "_successor_" + i);
}


var originalWorkItemExpiration = Duration.ofSeconds(5);
try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "successorTest")) {
Assertions.assertFalse(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
workCoordinator.createUnassignedWorkItem(docId, testContext::createUnassignedWorkContext);
Assertions.assertTrue(workCoordinator.workItemsNotYetComplete(testContext::createItemsPendingContext));
// Claim the work item
getWorkItemAndVerify(testContext, "successorTest", new ConcurrentHashMap<>(), originalWorkItemExpiration, false, false);
var client = httpClientSupplier.get();
// Add an INCORRECT list of successors to the work item
var incorrectSuccessors = "successor_99,successor_98,successor_97";
var body = "{\"doc\": {\"successor_items\": \"" + incorrectSuccessors + "\"}}";
var response = client.makeJsonRequest("POST", ".migrations_working_state/_update/" + docId, null, body);
var responseBody = (new ObjectMapper()).readTree(response.getPayloadBytes());
Assertions.assertEquals(200, response.getStatusCode());

// Now attempt to go through with the correct successor item list
Assertions.assertThrows(IllegalArgumentException.class,
() -> workCoordinator.createSuccessorWorkItemsAndMarkComplete(docId, successorItems, testContext::createSuccessorWorkItemsContext));
}
}

// Create a test where a work item tries to create itself as a successor -- it should fail and NOT be marked as complete. Another worker should pick it up and double the lease time.

@SneakyThrows
Expand Down

0 comments on commit 9af9db4

Please sign in to comment.