diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 7deb44da9..32473136d 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -43,5 +43,6 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then fi fi -echo "Executing RFS Command" -eval $RFS_COMMAND +[ -z "$RFS_COMMAND" ] && \ +{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \ +until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND"; }; do :; done \ No newline at end of file diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index e50369a53..592c09dd1 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -45,7 +45,8 @@ @Slf4j public class RfsMigrateDocuments { - public static final int PROCESS_TIMED_OUT = 2; + public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2; + public static final int NO_WORK_LEFT_EXIT_CODE = 3; public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5; public static final String LOGGING_MDC_WORKER_ID = "workerId"; @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception { var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null; var connectionContext = arguments.targetArgs.toConnectionContext(); - try (var processManager = new LeaseExpireTrigger(workItemId -> { - log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); - System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator( - new CoordinateWorkHttpClient(connectionContext), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, - workerId - )) { + try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC()); + var workCoordinator = new OpenSearchWorkCoordinator( + new CoordinateWorkHttpClient(connectionContext), + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, + workerId) + ) { MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main OpenSearchClient targetClient = new OpenSearchClient(connectionContext); DocumentReindexer reindexer = new DocumentReindexer(targetClient, @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception { unpackerFactory, arguments.maxShardSizeBytes, context); + } catch (NoWorkLeftException e) { + log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log(); + System.exit(NO_WORK_LEFT_EXIT_CODE); } catch (Exception e) { log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log(); throw e; } } + private static void exitOnLeaseTimeout(String workItemId) { + log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); + System.exit(PROCESS_TIMED_OUT_EXIT_CODE); + } + private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) { var compositeContextTracker = new CompositeContextTracker( new ActiveContextTracker(), diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index b577ddd3e..ceecdfd54 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -10,9 +10,14 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -47,6 +52,37 @@ enum FailHow { WITH_DELAYS } + @AllArgsConstructor + @Getter + private static class RunData { + Path tempDirSnapshot; + Path tempDirLucene; + ToxiProxyWrapper proxyContainer; + } + + @Test + @Tag("longTest") + public void testExitsZeroThenThreeForSimpleSetup() throws Exception { + testProcess(3, + d -> { + var firstExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(0, firstExitCode); + for (int i=0; i<10; ++i) { + var secondExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + if (secondExitCode != 0) { + var lastErrorCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(secondExitCode, lastErrorCode); + return lastErrorCode; + } + } + Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code"); + return -1; // won't be evaluated + }); + } + @ParameterizedTest @CsvSource(value = { // This test will go through a proxy that doesn't add any defects and the process will use defaults @@ -62,6 +98,12 @@ enum FailHow { "WITH_DELAYS, 2" }) public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception { final var failHow = FailHow.valueOf(failAfterString); + testProcess(expectedExitCode, + d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow)); + } + + @SneakyThrows + private void testProcess(int expectedExitCode, Function processRunner) { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2); @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); - int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow); + int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer)); log.atInfo().setMessage("Process exited with code: " + actualExitCode).log(); // Check if the exit code is as expected @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC } } + @SneakyThrows private static int runProcessAgainstToxicTarget( Path tempDirSnapshot, Path tempDirLucene, ToxiProxyWrapper proxyContainer, - FailHow failHow - ) throws IOException, InterruptedException { + FailHow failHow) + { String targetAddress = proxyContainer.getProxyUriAsString(); var tp = proxyContainer.getProxy(); if (failHow == FailHow.AT_STARTUP) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py index a95dc9f8d..70f53b4e1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py @@ -62,7 +62,8 @@ def setup_backfill(request): assert metadata_result.success backfill_start_result: CommandResult = backfill.start() assert backfill_start_result.success - backfill_scale_result: CommandResult = backfill.scale(units=10) + # small enough to allow containers to be reused, big enough to test scaling out + backfill_scale_result: CommandResult = backfill.scale(units=2) assert backfill_scale_result.success