Skip to content

Commit

Permalink
Shutdown executors created by RFS LeaseExpireTrigger and KafkaTraffic…
Browse files Browse the repository at this point in the history
…CaptureSource.

Both of those classes used the DefaultThreadFactory, which uses daemon threads, which will keep a process alive even after main() has finished.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Jun 25, 2024
1 parent b62d0a3 commit c4d682f
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,35 @@ public static void main(String[] args) throws Exception {
.parse(args);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
var processManager = new LeaseExpireTrigger(workItemId->{
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory,
arguments.maxShardSizeBytes);
});
}, Clock.systemUTC())) {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, arguments.indexAllowlist, shardMetadataFactory, unpackerFactory,
arguments.maxShardSizeBytes);
});
}
}

public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocumentsReader> readerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,17 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
throws RfsMigrateDocuments.NoWorkLeftException
{
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
try {
var shouldThrow = new AtomicBoolean();
var shouldThrow = new AtomicBoolean();
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log();
shouldThrow.set(true);
})) {
UnaryOperator<Document> terminatingDocumentFilter = d -> {
if (shouldThrow.get()) {
throw new LeasePastError();
}
return d;
};
var processManager = new LeaseExpireTrigger(workItemId->{
log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log();
shouldThrow.set(true);
});

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
Expand Down
9 changes: 4 additions & 5 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public static void main(String[] args) throws Exception {
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

try {
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {

Check warning on line 150 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L147-L150

Added lines #L147 - L150 were not covered by tests
log.info("Running RfsWorker");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
Expand All @@ -168,10 +171,6 @@ public static void main(String[] args) throws Exception {
var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for "+workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
Expand Down
7 changes: 6 additions & 1 deletion RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* time unless the work is marked as complete before that expiration. This class may, but does not need to,
* synchronize its clock with an external source of truth for better accuracy.
*/
public class LeaseExpireTrigger {
public class LeaseExpireTrigger implements AutoCloseable {
private final ScheduledExecutorService scheduledExecutorService;
final ConcurrentHashMap<String, Instant> workItemToLeaseMap;
final Consumer<String> onLeaseExpired;
Expand Down Expand Up @@ -47,4 +47,9 @@ public void registerExpiration(String workItemId, Instant killTime) {
public void markWorkAsCompleted(String workItemId) {
workItemToLeaseMap.remove(workItemId);
}

@Override
public void close() throws Exception {
scheduledExecutorService.shutdownNow();
}

Check warning on line 54 in RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java#L53-L54

Added lines #L53 - L54 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class KafkaTrafficCaptureSource implements ISimpleTrafficCaptureSource {
private final AtomicLong trafficStreamsRead;
private final KafkaBehavioralPolicy behavioralPolicy;
private final ChannelContextManager channelContextManager;
private final AtomicBoolean isClosed;

public KafkaTrafficCaptureSource(@NonNull RootReplayerContext globalContext,
Consumer<String, byte[]> kafkaConsumer, String topic, Duration keepAliveInterval) {
Expand All @@ -95,6 +98,7 @@ public KafkaTrafficCaptureSource(@NonNull RootReplayerContext globalContext,
this.behavioralPolicy = behavioralPolicy;
kafkaConsumer.subscribe(Collections.singleton(topic), trackingKafkaConsumer);
kafkaExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("kafkaConsumerThread"));
isClosed = new AtomicBoolean(false);
}

private void onKeyFinishedCommitting(ITrafficStreamKey trafficStreamKey) {
Expand Down Expand Up @@ -239,6 +243,9 @@ public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) {

@Override
public void close() throws IOException, InterruptedException, ExecutionException {
kafkaExecutor.submit(trackingKafkaConsumer::close).get();
if (isClosed.compareAndSet(false, true)) {
kafkaExecutor.submit(trackingKafkaConsumer::close).get();
kafkaExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,9 @@ private static void runTrafficReplayer(TrafficReplayerTopLevel trafficReplayer,
Consumer<SourceTargetCaptureTuple> tupleReceiver,
TimeShifter timeShifter) throws Exception {
log.info("Starting a new replayer and running it");

try (var os = new NullOutputStream();
var trafficSource = captureSourceSupplier.get();
var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) {
try (var trafficSource = new BlockingTrafficSource(captureSourceSupplier.get(), Duration.ofMinutes(2))) {
trafficReplayer.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), Duration.ofSeconds(30),
blockingTrafficSource, timeShifter, tupleReceiver);
trafficSource, timeShifter, tupleReceiver);
}
}

Expand Down

0 comments on commit c4d682f

Please sign in to comment.