Skip to content

Commit

Permalink
Run the RFS Container's DocumentMigration application repeatedly as l…
Browse files Browse the repository at this point in the history
…ong it's successful.

Unsuccessful means anything but a 0 exit code.  The application has been cleaned up to return 3 for no work available.  It used to return whatever a top-level exception would create.  That part wasn't necessary, but it seemed like a good idea to test for that to make sure that after running repeatedly, processes would eventually do THAT instead of returning 0, causing infinite loops in the containers.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Oct 2, 2024
1 parent 647f601 commit d60125d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 16 deletions.
5 changes: 3 additions & 2 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then
fi
fi

echo "Executing RFS Command"
eval $RFS_COMMAND
[ -z "$RFS_COMMAND" ] && \
{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \
until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND"; }; do :; done
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 2;
public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2;
public static final int NO_WORK_LEFT_EXIT_CODE = 3;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

Expand Down Expand Up @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception {
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

var connectionContext = arguments.targetArgs.toConnectionContext();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
)) {
try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
Expand Down Expand Up @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception {
unpackerFactory,
arguments.maxShardSizeBytes,
context);
} catch (NoWorkLeftException e) {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}

private static void exitOnLeaseTimeout(String workItemId) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

Expand Down Expand Up @@ -47,6 +52,37 @@ enum FailHow {
WITH_DELAYS
}

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Test
@Tag("longTest")
public void testExitsZeroThenThreeForSimpleSetup() throws Exception {
testProcess(3,
d -> {
var firstExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(0, firstExitCode);
for (int i=0; i<10; ++i) {
var secondExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
if (secondExitCode != 0) {
var lastErrorCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(secondExitCode, lastErrorCode);
return lastErrorCode;
}
}
Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code");
return -1; // won't be evaluated
});
}

@ParameterizedTest
@CsvSource(value = {
// This test will go through a proxy that doesn't add any defects and the process will use defaults
Expand All @@ -62,6 +98,12 @@ enum FailHow {
"WITH_DELAYS, 2" })
public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception {
final var failHow = FailHow.valueOf(failAfterString);
testProcess(expectedExitCode,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow));
}

@SneakyThrows
private void testProcess(int expectedExitCode, Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
Expand Down Expand Up @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow);
int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();

// Check if the exit code is as expected
Expand All @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
}
}

@SneakyThrows
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
FailHow failHow
) throws IOException, InterruptedException {
FailHow failHow)
{
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def setup_backfill(request):
assert metadata_result.success
backfill_start_result: CommandResult = backfill.start()
assert backfill_start_result.success
backfill_scale_result: CommandResult = backfill.scale(units=10)
# small enough to allow containers to be reused, big enough to test scaling out
backfill_scale_result: CommandResult = backfill.scale(units=2)
assert backfill_scale_result.success


Expand Down

0 comments on commit d60125d

Please sign in to comment.