Skip to content

Commit

Permalink
Shutdown executors created by RFS LeaseExpireTrigger and KafkaTraffic…
Browse files Browse the repository at this point in the history
…CaptureSource.

Both of those classes used the DefaultThreadFactory, which uses daemon threads, which will keep a process alive even after main() has finished.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Jun 25, 2024
1 parent 93fd822 commit fe9d32b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,34 @@ public static void main(String[] args) throws Exception {
.parse(args);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
var processManager = new LeaseExpireTrigger(workItemId->{
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}, Clock.systemUTC())) {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}
}

public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocumentsReader> readerFactory,
Expand Down
40 changes: 20 additions & 20 deletions DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,32 +242,32 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
}
return d;
};
var processManager = new LeaseExpireTrigger(workItemId->{
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log();
shouldThrow.set(true);
});

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
})) {
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);

return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(targetAddress, null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(targetAddress)),
return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(targetAddress, null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(targetAddress)),
// new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(),
// null, null)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
processManager,
indexMetadataFactory,
snapshotName,
shardMetadataFactory,
unpackerFactory,
16*1024*1024);
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
processManager,
indexMetadataFactory,
snapshotName,
shardMetadataFactory,
unpackerFactory,
16 * 1024 * 1024);
}
} finally {
deleteTree(tempDir);
}
Expand Down
37 changes: 19 additions & 18 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ public static void main(String[] args) throws Exception {
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

try {
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
log.info("Running RfsWorker");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
Expand All @@ -167,23 +170,21 @@ public static void main(String[] args) throws Exception {
var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for "+workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator,
(name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard),
unpackerFactory,
path -> new LuceneDocumentsReader(path),
reindexer)
.migrateNextShard();
} catch (Exception e) {
log.error("Unexpected error running RfsWorker", e);
throw e;
try {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator,
(name, shard) -> shardMetadataFactory.fromRepo(snapshotName, name, shard),
unpackerFactory,
path -> new LuceneDocumentsReader(path),
reindexer)
.migrateNextShard();
} catch (Exception e) {
log.error("Unexpected error running RfsWorker", e);
throw e;
}
}
}
}
7 changes: 6 additions & 1 deletion RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* time unless the work is marked as complete before that expiration. This class may, but does not need to,
* synchronize its clock with an external source of truth for better accuracy.
*/
public class LeaseExpireTrigger {
public class LeaseExpireTrigger implements AutoCloseable {
private final ScheduledExecutorService scheduledExecutorService;
final ConcurrentHashMap<String, Instant> workItemToLeaseMap;
final Consumer<String> onLeaseExpired;
Expand Down Expand Up @@ -47,4 +47,9 @@ public void registerExpiration(String workItemId, Instant killTime) {
public void markWorkAsCompleted(String workItemId) {
workItemToLeaseMap.remove(workItemId);
}

@Override
public void close() throws Exception {
scheduledExecutorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,6 @@ public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) {
@Override
public void close() throws IOException, InterruptedException, ExecutionException {
kafkaExecutor.submit(trackingKafkaConsumer::close).get();
kafkaExecutor.shutdownNow();
}
}

0 comments on commit fe9d32b

Please sign in to comment.