Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Abstract out remote segment garbage collection logic #13221

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -27,6 +35,8 @@
public class RemoteStoreUtils {
public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length();

private static final Logger logger = LogManager.getLogger(RemoteStoreUtils.class);

/**
* URL safe base 64 character set. This must not be changed as this is used in deriving the base64 equivalent of binary.
*/
Expand Down Expand Up @@ -146,4 +156,26 @@ static String longToCompositeBase64AndBinaryEncoding(long value, int len) {
assert base64DecimalValue >= 0 && base64DecimalValue < 64;
return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart;
}

/**
* Returns if the latest segments_N file is uploaded to the remote store. We fetch latest segment_N filename that is
* present in local directory and check if it is present in remote directory.
* @param storeDirectory instance of local directory
* @param remoteDirectory instance of RemoteSegmentStoreDirectory
* @return true if latest semgents_N from local is present in remote. If otherwise or on exception, returns false.
*/
public static boolean isLatestSegmentInfosUploadedToRemote(Directory storeDirectory, RemoteSegmentStoreDirectory remoteDirectory) {
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (lastCommittedLocalSegmentFileName != null) {
try (IndexInput indexInput = storeDirectory.openInput(lastCommittedLocalSegmentFileName, IOContext.DEFAULT)) {
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
return remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, checksum);
}
}
} catch (Exception e) {
logger.info("Exception occurred while checking isLatestSegmentInfosUploadedToRemote", e);
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.RemoteStoreSettings;

public class CommitTriggeredRemoteSegmentGarbageCollector implements RemoteSegmentGarbageCollector {
private final Logger logger;
private final Directory storeDirectory;
private final RemoteSegmentStoreDirectory remoteDirectory;

private final RemoteStoreSettings remoteStoreSettings;

public CommitTriggeredRemoteSegmentGarbageCollector(
ShardId shardId,
Directory storeDirectory,
RemoteSegmentStoreDirectory remoteDirectory,
RemoteStoreSettings remoteStoreSettings
) {
logger = Loggers.getLogger(getClass(), shardId);
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
public void deleteStaleSegments(Listener listener) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid triggering delete post each refresh.
if (RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false) {
remoteDirectory.deleteStaleSegmentsAsync(remoteStoreSettings.getMinRemoteSegmentMetadataFiles(), new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
listener.onSuccess(unused);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
String skipReason =
"Skipping garbage collection, CommitTriggeredRemoteSegmentGarbageCollector only triggers deletion on commit";
logger.debug(skipReason);
listener.onSkip(skipReason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

public interface RemoteSegmentGarbageCollector {

interface Listener {

/**
* Called after the successful deletion of stale segments from remote store
*/
void onSuccess(Void response);

/**
* Called on exception while deleting stale segments from remote store
*/
void onFailure(Exception e);

/**
* Called on skipping the garbage collection
*/
void onSkip(String reason);
Comment on lines +25 to +28
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better contract would be shouldGarbageCollect() that returns a boolean. We can add additional safety constraints explicitly

}

Listener DEFAULT_NOOP_LISTENER = new Listener() {
@Override
public void onSuccess(Void response) {}

@Override
public void onFailure(Exception e) {}

@Override
public void onSkip(String reason) {}
};

/**
* Deletes stale segments from remote store and notifies the listener.
*/
void deleteStaleSegments(Listener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
private volatile long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final RemoteSegmentGarbageCollector remoteSegmentGarbageCollector;

public RemoteStoreRefreshListener(
IndexShard indexShard,
Expand Down Expand Up @@ -116,6 +118,12 @@ public RemoteStoreRefreshListener(
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.remoteSegmentGarbageCollector = new CommitTriggeredRemoteSegmentGarbageCollector(
indexShard.shardId(),
storeDirectory,
remoteDirectory,
indexShard.getRemoteStoreSettings()
);
}

@Override
Expand Down Expand Up @@ -172,7 +180,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe()
|| isRefreshAfterCommit()
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
Expand Down Expand Up @@ -220,12 +228,10 @@ private boolean syncSegments() {
try {
try {
initializeRemoteDirectoryOnTermUpdate();
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
}

// We notify the garbage collector each time, the implementation of garbage collector will decide on
// trigger and frequency of deleting stale segments.
remoteSegmentGarbageCollector.deleteStaleSegments(RemoteSegmentGarbageCollector.DEFAULT_NOOP_LISTENER);

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -362,23 +368,12 @@ protected String getRetryThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

/**
* Returns if the current refresh has happened after a commit.
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
*/
private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
}
return false;
private boolean isRefreshAfterCommit() {
return RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false;
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
Expand Down
Loading