From cc805f93b0b39be1958e66a5125e33e9219ca510 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 16 Apr 2024 15:31:46 +0530 Subject: [PATCH] Abstract out remote segemnt garbage collection logic Signed-off-by: Sachin Kale --- .../index/remote/RemoteStoreUtils.java | 32 ++++++++++ ...riggeredRemoteSegmentGarbageCollector.java | 63 +++++++++++++++++++ .../shard/RemoteSegmentGarbageCollector.java | 46 ++++++++++++++ .../shard/RemoteStoreRefreshListener.java | 35 +++++------ 4 files changed, 156 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/CommitTriggeredRemoteSegmentGarbageCollector.java create mode 100644 server/src/main/java/org/opensearch/index/shard/RemoteSegmentGarbageCollector.java diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 4d1d98334c3c4..9f25d4b07383b 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,7 +8,15 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.collect.Tuple; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import java.nio.ByteBuffer; import java.util.Arrays; @@ -27,6 +35,8 @@ public class RemoteStoreUtils { public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length(); + private static final Logger logger = LogManager.getLogger(RemoteStoreUtils.class); + /** * URL safe base 64 character set. This must not be changed as this is used in deriving the base64 equivalent of binary. */ @@ -146,4 +156,26 @@ static String longToCompositeBase64AndBinaryEncoding(long value, int len) { assert base64DecimalValue >= 0 && base64DecimalValue < 64; return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart; } + + /** + * Returns if the latest segments_N file is uploaded to the remote store. We fetch latest segment_N filename that is + * present in local directory and check if it is present in remote directory. + * @param storeDirectory instance of local directory + * @param remoteDirectory instance of RemoteSegmentStoreDirectory + * @return true if latest semgents_N from local is present in remote. If otherwise or on exception, returns false. + */ + public static boolean isLatestSegmentInfosUploadedToRemote(Directory storeDirectory, RemoteSegmentStoreDirectory remoteDirectory) { + try { + String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); + if (lastCommittedLocalSegmentFileName != null) { + try (IndexInput indexInput = storeDirectory.openInput(lastCommittedLocalSegmentFileName, IOContext.DEFAULT)) { + String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput)); + return remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, checksum); + } + } + } catch (Exception e) { + logger.info("Exception occurred while checking isLatestSegmentInfosUploadedToRemote", e); + } + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/shard/CommitTriggeredRemoteSegmentGarbageCollector.java b/server/src/main/java/org/opensearch/index/shard/CommitTriggeredRemoteSegmentGarbageCollector.java new file mode 100644 index 0000000000000..00511265142c4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/CommitTriggeredRemoteSegmentGarbageCollector.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.opensearch.common.logging.Loggers; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.indices.RemoteStoreSettings; + +public class CommitTriggeredRemoteSegmentGarbageCollector implements RemoteSegmentGarbageCollector { + private final Logger logger; + private final Directory storeDirectory; + private final RemoteSegmentStoreDirectory remoteDirectory; + + private final RemoteStoreSettings remoteStoreSettings; + + public CommitTriggeredRemoteSegmentGarbageCollector( + ShardId shardId, + Directory storeDirectory, + RemoteSegmentStoreDirectory remoteDirectory, + RemoteStoreSettings remoteStoreSettings + ) { + logger = Loggers.getLogger(getClass(), shardId); + this.storeDirectory = storeDirectory; + this.remoteDirectory = remoteDirectory; + this.remoteStoreSettings = remoteStoreSettings; + } + + @Override + public void deleteStaleSegments(Listener listener) { + // if a new segments_N file is present in local that is not uploaded to remote store yet, it + // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. + // This is done to avoid triggering delete post each refresh. + if (RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false) { + remoteDirectory.deleteStaleSegmentsAsync(remoteStoreSettings.getMinRemoteSegmentMetadataFiles(), new ActionListener() { + @Override + public void onResponse(Void unused) { + listener.onSuccess(unused); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + String skipReason = + "Skipping garbage collection, CommitTriggeredRemoteSegmentGarbageCollector only triggers deletion on commit"; + logger.debug(skipReason); + listener.onSkip(skipReason); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteSegmentGarbageCollector.java b/server/src/main/java/org/opensearch/index/shard/RemoteSegmentGarbageCollector.java new file mode 100644 index 0000000000000..e009b274aedbe --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteSegmentGarbageCollector.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +public interface RemoteSegmentGarbageCollector { + + interface Listener { + + /** + * Called after the successful deletion of stale segments from remote store + */ + void onSuccess(Void response); + + /** + * Called on exception while deleting stale segments from remote store + */ + void onFailure(Exception e); + + /** + * Called on skipping the garbage collection + */ + void onSkip(String reason); + } + + Listener DEFAULT_NOOP_LISTENER = new Listener() { + @Override + public void onSuccess(Void response) {} + + @Override + public void onFailure(Exception e) {} + + @Override + public void onSkip(String reason) {} + }; + + /** + * Deletes stale segments from remote store and notifies the listener. + */ + void deleteStaleSegments(Listener listener); +} diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 351aec6e3af6c..0332a102971e4 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -29,6 +29,7 @@ import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteSegmentTransferTracker; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; @@ -89,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh private volatile long primaryTerm; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteSegmentGarbageCollector remoteSegmentGarbageCollector; public RemoteStoreRefreshListener( IndexShard indexShard, @@ -116,6 +118,12 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; + this.remoteSegmentGarbageCollector = new CommitTriggeredRemoteSegmentGarbageCollector( + indexShard.shardId(), + storeDirectory, + remoteDirectory, + indexShard.getRemoteStoreSettings() + ); } @Override @@ -172,7 +180,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { // When the shouldSync is called the first time, then 1st condition on primary term is true. But after that // we update the primary term and the same condition would not evaluate to true again in syncSegments. // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. - || isRefreshAfterCommitSafe() + || isRefreshAfterCommit() || isRemoteSegmentStoreInSync() == false; if (shouldSync || skipPrimaryTermCheck) { return shouldSync; @@ -220,12 +228,10 @@ private boolean syncSegments() { try { try { initializeRemoteDirectoryOnTermUpdate(); - // if a new segments_N file is present in local that is not uploaded to remote store yet, it - // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. - // This is done to avoid delete post each refresh. - if (isRefreshAfterCommit()) { - remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles()); - } + + // We notify the garbage collector each time, the implementation of garbage collector will decide on + // trigger and frequency of deleting stale segments. + remoteSegmentGarbageCollector.deleteStaleSegments(RemoteSegmentGarbageCollector.DEFAULT_NOOP_LISTENER); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); @@ -362,23 +368,12 @@ protected String getRetryThreadPoolName() { return ThreadPool.Names.REMOTE_REFRESH_RETRY; } - private boolean isRefreshAfterCommit() throws IOException { - String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); - return (lastCommittedLocalSegmentFileName != null - && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); - } - /** * Returns if the current refresh has happened after a commit. * @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false. */ - private boolean isRefreshAfterCommitSafe() { - try { - return isRefreshAfterCommit(); - } catch (Exception e) { - logger.info("Exception occurred in isRefreshAfterCommitSafe", e); - } - return false; + private boolean isRefreshAfterCommit() { + return RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false; } void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)