Skip to content

Commit

Permalink
[Remote Store] Add implementation for path types and change default p…
Browse files Browse the repository at this point in the history
…ath type

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 6, 2024
1 parent e713175 commit 1e5e346
Show file tree
Hide file tree
Showing 14 changed files with 631 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -56,6 +57,10 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -279,6 +284,11 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
String restoredIndexName1version1 = indexName1 + "-restored-1";
String restoredIndexName1version2 = indexName1 + "-restored-2";

client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.get();
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
Client client = client();
Settings indexSettings = getIndexSettings(1, 0).build();
Expand Down Expand Up @@ -476,12 +486,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
}

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");
Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata");
Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data");
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
Expand Down Expand Up @@ -57,7 +58,11 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -182,13 +187,9 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
Expand All @@ -212,12 +213,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
Expand All @@ -231,12 +228,8 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
Expand All @@ -250,12 +243,9 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
Expand Down Expand Up @@ -589,12 +579,8 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -22,7 +23,10 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand All @@ -45,8 +49,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand All @@ -27,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -307,7 +314,21 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, "snap1");
SnapshotInfo snapshotInfo2 = createFullSnapshot(snapshotRepoName, "snap2");

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete remote store index
Expand All @@ -320,7 +341,9 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);
assertTrue(lockFiles[0].contains(snapshotInfo2.snapshotId().getUUID()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,24 @@ boolean requiresHashAlgorithm() {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for unblocking couple of tests.
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return addPrefix(pathInput.basePath(), hashAlgorithm.hash(pathInput)).add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
.add(pathInput.dataType().getName());
}

@Override
boolean requiresHashAlgorithm() {
return true;
}
},
HASHED_INFIX(2) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
Expand Down Expand Up @@ -200,10 +215,11 @@ public enum PathHashAlgorithm {

FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
.getName();
return FNV1a.hash32(input);
long hash = FNV1a.hash64(input);
return RemoteStoreUtils.longToBase64(hash);
}
};

Expand All @@ -218,6 +234,7 @@ public int getCode() {
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;

static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
Expand All @@ -240,7 +257,7 @@ public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);
abstract String hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
try {
Expand All @@ -257,4 +274,13 @@ public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
*/
public static final String NAME = "path_hash_algorithm";
}

static BlobPath addPrefix(BlobPath blobPath, String prefix) {
BlobPath updatedPath = BlobPath.cleanPath();
updatedPath = updatedPath.add(prefix);
for (String path : blobPath) {
updatedPath = updatedPath.add(path);
}
return updatedPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.opensearch.common.collect.Tuple;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,4 +103,26 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}

/**
* Converts an input hash which occupies 64 bits of space into Base64 (6 bits per character) String. This must not
* be changed as it is used for creating path for storing remote store data on the remote store.
*/
static String longToBase64(long value) {
byte[] hashBytes = ByteBuffer.allocate(Long.BYTES).putLong(value).array();
return base64(hashBytes);
}

/**
* This converts the byte array to base 64 string. `/` is replaced with `_`, `+` is replaced with `-` and `=`
* which is padded at the last is also removed. These characters are either used as delimiter or special character
* requiring special handling in some vendors. The characters present in this base64 version are [A-Za-z0-9_-].
* This must not be changed as it is used for creating path for storing remote store data on the remote store.
*
* @param bytes byte array
* @return base 64 string.
*/
private static String base64(byte[] bytes) {
String base64 = Base64.getEncoder().encodeToString(bytes);
return base64.substring(0, base64.length() - 1).replace('/', '_').replace('+', '-');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public class IndicesService extends AbstractLifecycleComponent
*/
public static final Setting<PathType> CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index.path.prefix.type",
PathType.FIXED.toString(),
PathType.HASHED_PREFIX.toString(),
PathType::parseString,
Property.NodeScope,
Property.Dynamic
Expand Down
Loading

0 comments on commit 1e5e346

Please sign in to comment.