diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index d8007c971..f86ea53dc 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -101,33 +101,34 @@ public static void main(String[] args) throws Exception { .parse(args); var luceneDirPath = Paths.get(arguments.luceneDirPath); - var processManager = new LeaseExpireTrigger(workItemId->{ + try (var processManager = new LeaseExpireTrigger(workItemId->{ log.error("terminating RunRfsWorker because its lease has expired for " + workItemId); System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()); - - TryHandlePhaseFailure.executeWithTryCatch(() -> { - log.info("Running RfsWorker"); - - OpenSearchClient targetClient = - new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false); - DocumentReindexer reindexer = new DocumentReindexer(targetClient); - - SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath), - new S3Uri(arguments.s3RepoUri), arguments.s3Region); - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, - luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - - run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory, - arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes); - }); + }, Clock.systemUTC())) { + var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)), + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()); + + TryHandlePhaseFailure.executeWithTryCatch(() -> { + log.info("Running RfsWorker"); + + OpenSearchClient targetClient = + new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false); + DocumentReindexer reindexer = new DocumentReindexer(targetClient); + + SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath), + new S3Uri(arguments.s3RepoUri), arguments.s3Region); + SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); + + IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, + luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + + run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory, + arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes); + }); + } } public static DocumentsRunner.CompletionStatus run(Function readerFactory, diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java index 1d1b11cc3..75dbbcf4d 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java @@ -242,32 +242,32 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep } return d; }; - var processManager = new LeaseExpireTrigger(workItemId->{ + try (var processManager = new LeaseExpireTrigger(workItemId->{ log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log(); shouldThrow.set(true); - }); - - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, - tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + })) { + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, + tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); + IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter), - new DocumentReindexer(new OpenSearchClient(targetAddress, null)), - new OpenSearchWorkCoordinator( - new ApacheHttpClient(new URI(targetAddress)), + return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter), + new DocumentReindexer(new OpenSearchClient(targetAddress, null)), + new OpenSearchWorkCoordinator( + new ApacheHttpClient(new URI(targetAddress)), // new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(), // null, null)), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()), - processManager, - indexMetadataFactory, - snapshotName, - shardMetadataFactory, - unpackerFactory, - 16*1024*1024); + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()), + processManager, + indexMetadataFactory, + snapshotName, + shardMetadataFactory, + unpackerFactory, + 16 * 1024 * 1024); + } } finally { deleteTree(tempDir); } diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 768f2f851..da2b6fd9b 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -143,7 +143,10 @@ public static void main(String[] args) throws Exception { final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); - try { + try (var processManager = new LeaseExpireTrigger(workItemId -> { + log.error("terminating RunRfsWorker because its lease has expired for " + workItemId); + System.exit(PROCESS_TIMED_OUT); + }, Clock.systemUTC())) { log.info("Running RfsWorker"); OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); OpenSearchClient targetClient = new OpenSearchClient(targetConnection); @@ -167,23 +170,21 @@ public static void main(String[] args) throws Exception { var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); DocumentReindexer reindexer = new DocumentReindexer(targetClient); - var processManager = new LeaseExpireTrigger(workItemId->{ - log.error("terminating RunRfsWorker because its lease has expired for "+workItemId); - System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)), - 5, UUID.randomUUID().toString()); - var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager); - new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName); - new DocumentsRunner(scopedWorkCoordinator, - (name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard), - unpackerFactory, - path -> new LuceneDocumentsReader(path), - reindexer) - .migrateNextShard(); - } catch (Exception e) { - log.error("Unexpected error running RfsWorker", e); - throw e; + try { + var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)), + 5, UUID.randomUUID().toString()); + var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager); + new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName); + new DocumentsRunner(scopedWorkCoordinator, + (name, shard) -> shardMetadataFactory.fromRepo(snapshotName, name, shard), + unpackerFactory, + path -> new LuceneDocumentsReader(path), + reindexer) + .migrateNextShard(); + } catch (Exception e) { + log.error("Unexpected error running RfsWorker", e); + throw e; + } } } } diff --git a/RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java b/RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java index d9b23888f..00b641adb 100644 --- a/RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java +++ b/RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java @@ -16,7 +16,7 @@ * time unless the work is marked as complete before that expiration. This class may, but does not need to, * synchronize its clock with an external source of truth for better accuracy. */ -public class LeaseExpireTrigger { +public class LeaseExpireTrigger implements AutoCloseable { private final ScheduledExecutorService scheduledExecutorService; final ConcurrentHashMap workItemToLeaseMap; final Consumer onLeaseExpired; @@ -47,4 +47,9 @@ public void registerExpiration(String workItemId, Instant killTime) { public void markWorkAsCompleted(String workItemId) { workItemToLeaseMap.remove(workItemId); } + + @Override + public void close() throws Exception { + scheduledExecutorService.shutdownNow(); + } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java index 3dd4b7b5e..740ef9f2c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.java @@ -240,5 +240,6 @@ public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) { @Override public void close() throws IOException, InterruptedException, ExecutionException { kafkaExecutor.submit(trackingKafkaConsumer::close).get(); + kafkaExecutor.shutdownNow(); } }