Skip to content

Commit

Permalink
Cleanup file cache on deleting index/shard directory (#11443) (#12154)
Browse files Browse the repository at this point in the history
* cleanup file cache on deleting index/shard directory



* add index store listener



---------


(cherry picked from commit c564ee3)

Signed-off-by: panguixin <panguixin@bytedance.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 4731efc commit 11bea3d
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
Expand Down Expand Up @@ -859,4 +862,75 @@ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, i
// Verifies if all the shards (primary and replica) have been deleted
assertEquals(numCacheFolderCount, searchNodeFileCachePaths.size());
}

public void testRelocateSearchableSnapshotIndex() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx-1";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

String searchNode1 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

String searchNode2 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();

final Index index = resolveIndex(restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, true);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);

// relocate the shard from node1 to node2
client.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(restoredIndexName, 0, searchNode1, searchNode2))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client.admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertDocCount(restoredIndexName, 100L);

assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, false);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, true);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
final ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardId);

assertBusy(() -> {
assertTrue(
"shard state path should " + (exists ? "exist" : "not exist"),
Files.exists(shardPath.getShardStatePath()) == exists
);
assertTrue("shard cache path should " + (exists ? "exist" : "not exist"), Files.exists(shardPath.getDataPath()) == exists);
}, 30, TimeUnit.SECONDS);

final Path indexDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID());
final Path indexPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID());
assertBusy(() -> {
assertTrue("index path should " + (exists ? "exist" : "not exist"), Files.exists(indexDataPath) == exists);
assertTrue("index cache path should " + (exists ? "exist" : "not exist"), Files.exists(indexPath) == exists);
}, 30, TimeUnit.SECONDS);
}
}
43 changes: 41 additions & 2 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private final IndexStoreListener indexStoreListener;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -295,18 +297,23 @@ public void close() {
}
}

public NodeEnvironment(Settings settings, Environment environment) throws IOException {
this(settings, environment, IndexStoreListener.EMPTY);
}

/**
* Setup the environment.
* @param settings settings from opensearch.yml
*/
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
this.indexStoreListener = IndexStoreListener.EMPTY;
return;
}
boolean success = false;
Expand Down Expand Up @@ -385,6 +392,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
this.indexStoreListener = indexStoreListener;
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -577,6 +585,9 @@ public static void acquireFSLockForPaths(IndexSettings indexSettings, Path... sh
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";

indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this);

final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
Expand Down Expand Up @@ -653,6 +664,8 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this);

final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
Expand All @@ -663,6 +676,18 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
}
}

private void deleteIndexFileCacheDirectory(Index index) {
final Path indexCachePath = fileCacheNodePath().fileCachePath.resolve(index.getUUID());
logger.trace("deleting index {} file cache directory, path: [{}]", index, indexCachePath);
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
}
}
}

/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
Expand Down Expand Up @@ -1387,4 +1412,18 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.inject.Provider;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.cluster.IndicesClusterStateService;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -30,79 +27,90 @@
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* IndexEventListener to clean up file cache when the index is deleted. The cached entries will be eligible
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
* for eviction when the shard is deleted, but this listener deterministically removes entries from memory and
* from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction.
*
* @opensearch.internal
*/
public class FileCacheCleaner implements IndexEventListener {
private static final Logger log = LogManager.getLogger(FileCacheCleaner.class);
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;
private final Provider<FileCache> fileCacheProvider;

public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) {
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
this.fileCacheProvider = fileCacheProvider;
}

/**
* before shard deleted and after shard closed, cleans up the corresponding index file path entries from FC.
* @param shardId The shard id
* @param settings the shards index settings
* before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file
* cache path.
*
* @param shardId the shard id
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void beforeIndexShardDeleted(ShardId shardId, Settings settings) {
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
cleanupShardFileCache(shardPath);
deleteShardFileCacheDirectory(shardPath);
}
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
*/
private void cleanupShardFileCache(ShardPath shardPath) {
try {
if (isRemoteSnapshot(settings)) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fileCache.remove(subPath.toRealPath());
}
final FileCache fc = fileCacheProvider.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
log.error(() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardId), ioe);
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
ioe
);
}
}

@Override
public void afterIndexShardDeleted(ShardId shardId, Settings settings) {
if (isRemoteSnapshot(settings)) {
final Path path = ShardPath.loadFileCachePath(nodeEnvironment, shardId).getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
log.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardId), e);
private void deleteShardFileCacheDirectory(ShardPath shardPath) {
final Path path = shardPath.getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);
}
}

/**
* before index path deleted, delete the corresponding index file cache path.
*
* @param index the index
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void afterIndexRemoved(
Index index,
IndexSettings indexSettings,
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason
) {
if (isRemoteSnapshot(indexSettings.getSettings())
&& reason == IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED) {
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID());
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
log.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
}
}
}
}

private static boolean isRemoteSnapshot(Settings settings) {
return IndexModule.Type.REMOTE_SNAPSHOT.match(settings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -364,7 +363,6 @@ public class IndicesService extends AbstractLifecycleComponent
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private volatile TimeValue clusterDefaultRefreshInterval;
private volatile TimeValue clusterRemoteTranslogBufferInterval;
private final FileCacheCleaner fileCacheCleaner;

private final SearchRequestStats searchRequestStats;

Expand Down Expand Up @@ -397,7 +395,6 @@ public IndicesService(
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier,
FileCacheCleaner fileCacheCleaner,
SearchRequestStats searchRequestStats,
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
RecoverySettings recoverySettings
Expand Down Expand Up @@ -452,7 +449,6 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCacheCleaner = fileCacheCleaner;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -768,7 +764,6 @@ public void onStoreClosed(ShardId shardId) {
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
finalListeners.add(fileCacheCleaner);
final IndexService indexService = createIndexService(
CREATE_INDEX,
indexMetadata,
Expand Down
Loading

0 comments on commit 11bea3d

Please sign in to comment.