Skip to content

Commit

Permalink
Add implementation for remote store path types (opensearch-project#13103
Browse files Browse the repository at this point in the history
)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 27, 2024
1 parent aa50a60 commit 3e776fc
Show file tree
Hide file tree
Showing 14 changed files with 623 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand Down Expand Up @@ -54,6 +55,10 @@
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -221,6 +226,11 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
String restoredIndexName1version1 = indexName1 + "-restored-1";
String restoredIndexName1version2 = indexName1 + "-restored-2";

client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.get();
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
Client client = client();
Settings indexSettings = getIndexSettings(1, 0).build();
Expand Down Expand Up @@ -418,12 +428,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
}

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");
Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata");
Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data");
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
Expand Down Expand Up @@ -58,7 +59,11 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -183,13 +188,9 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
Expand All @@ -213,12 +214,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
Expand All @@ -232,12 +229,8 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
Expand All @@ -251,12 +244,9 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
Expand Down Expand Up @@ -590,12 +580,8 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -22,7 +23,10 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand All @@ -45,8 +49,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand All @@ -27,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -308,7 +315,21 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, "snap1");
SnapshotInfo snapshotInfo2 = createFullSnapshot(snapshotRepoName, "snap2");

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete the giremote store index
Expand All @@ -321,7 +342,9 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
lockFiles = lockDirectory.listAll();
}
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);
assertTrue(lockFiles[0].contains(snapshotInfo2.snapshotId().getUUID()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public BlobPath add(String path) {
return new BlobPath(Collections.unmodifiableList(paths));
}

/**
* Add additional level of paths to the existing path and returns new {@link BlobPath} with the updated paths.
*/
public BlobPath add(Iterable<String> paths) {
List<String> updatedPaths = new ArrayList<>(this.paths);
paths.iterator().forEachRemaining(updatedPaths::add);
return new BlobPath(Collections.unmodifiableList(updatedPaths));
}

public String buildAsString() {
String p = String.join(SEPARATOR, paths);
if (p.isEmpty() || p.endsWith(SEPARATOR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,27 @@ boolean requiresHashAlgorithm() {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for unblocking couple of tests.
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
.add(pathInput.dataType().getName());
}

@Override
boolean requiresHashAlgorithm() {
return true;
}
},
HASHED_INFIX(2) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath()
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.indexUUID())
.add(pathInput.shardId())
.add(pathInput.dataCategory().getName())
Expand Down Expand Up @@ -200,10 +218,11 @@ public enum PathHashAlgorithm {

FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
.getName();
return FNV1a.hash32(input);
long hash = FNV1a.hash64(input);
return RemoteStoreUtils.longToUrlBase64(hash);
}
};

Expand All @@ -218,6 +237,7 @@ public int getCode() {
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;

static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
Expand All @@ -240,7 +260,7 @@ public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);
abstract String hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.opensearch.common.collect.Tuple;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -101,4 +103,17 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}

/**
* Converts an input hash which occupies 64 bits of space into Base64 (6 bits per character) String. This must not
* be changed as it is used for creating path for storing remote store data on the remote store.
* This converts the byte array to base 64 string. `/` is replaced with `_`, `+` is replaced with `-` and `=`
* which is padded at the last is also removed. These characters are either used as delimiter or special character
* requiring special handling in some vendors. The characters present in this base64 version are [A-Za-z0-9_-].
* This must not be changed as it is used for creating path for storing remote store data on the remote store.
*/
static String longToUrlBase64(long value) {
byte[] hashBytes = ByteBuffer.allocate(Long.BYTES).putLong(value).array();
String base64Str = Base64.getUrlEncoder().encodeToString(hashBytes);
return base64Str.substring(0, base64Str.length() - 1);
}
}
Loading

0 comments on commit 3e776fc

Please sign in to comment.