Skip to content

Commit

Permalink
Revert RfsMigrateDocuments changes
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait committed Jun 25, 2024
1 parent 6e13037 commit 2fcee46
Showing 1 changed file with 23 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,40 +105,29 @@ public static void main(String[] args) throws Exception {
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
try (var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString())) {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
while (true) {
try {
var completionStatus = run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager,
indexMetadataFactory, arguments.snapshotName, shardMetadataFactory, unpackerFactory,
arguments.maxShardSizeBytes);
if (completionStatus != DocumentsRunner.CompletionStatus.WORK_COMPLETED) {
log.info("Current completion status: {}", completionStatus);
}
} catch (NoWorkLeftException e) {
log.info("NoWorkLeftException", e);
log.info("Finishing");
break;
}
}
});
}
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}

public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocumentsReader> readerFactory,
Expand Down

0 comments on commit 2fcee46

Please sign in to comment.