Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transport action for bulk async shard fetch for primary shards #8218

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a9bc1c4
Added transport action for bulk async shard fetch for primary shards
sudarshan-baliga Jul 24, 2023
13b026d
Add PSA Async batch shard fetch transport integ test
sudarshan-baliga Jul 31, 2023
8404fc8
Refactor PSA Async batch shard fetch transport integ test
sudarshan-baliga Aug 3, 2023
767c47b
Merge branch 'main' into async-shard-fetch-psatransport
sudarshan-baliga Aug 4, 2023
d80f441
Update the documentation of TransportNodesListGatewayStartedShardsBatch.
sudarshan-baliga Aug 6, 2023
b8b5d88
Merge branch 'main' into async-shard-fetch-psatransport
sudarshan-baliga Aug 6, 2023
06e1243
Transport PSA change
shiv0408 Sep 25, 2023
933bcfc
Renamed Transport File to TransportNodesListGatewayStartedBatchShards
shiv0408 Sep 25, 2023
1c381dc
Removed AsyncBatchShardFetch
shiv0408 Sep 26, 2023
b8a141d
Renamed test files
shiv0408 Sep 26, 2023
84f63ac
Removed changes part of other PRs
shiv0408 Sep 26, 2023
ffd6031
Corrected the references of TransportNodesListGatewayStartedBatchShards
shiv0408 Sep 28, 2023
bc84bd9
Rename NodeGatewayStartedShards to NodeGatewayStartedShard
shiv0408 Dec 11, 2023
94f27cc
modify the request signature
shiv0408 Dec 12, 2023
4a62b19
Remove ShardAttributes because added in #8742
shiv0408 Dec 12, 2023
5c1f446
Added a Helper class to remove code duplication
shiv0408 Dec 12, 2023
0ff9a02
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Dec 12, 2023
93a548b
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Jan 8, 2024
89c1143
Fix build failure after main merge
shiv0408 Jan 8, 2024
55e43cf
Renamed TransportNodesListGatewayStartedShardsBatch
shiv0408 Jan 25, 2024
f8fb3cd
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Jan 25, 2024
551fc60
Address review comment
shiv0408 Jan 25, 2024
b57c330
Move integration tests to RecoveryFromGatewayIT
shiv0408 Jan 29, 2024
1dbd248
Added UsingBatchAction suffix to ITs
shiv0408 Feb 1, 2024
f8d8a6a
Fixed comment
shiv0408 Feb 1, 2024
d1e90a1
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Feb 1, 2024
8f113fc
empty commit to trigger workflow
shiv0408 Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.shard.ShardPath;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster;
import static org.opensearch.test.OpenSearchIntegTestCase.resolveIndex;

public class AsyncShardFetchTestUtils {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

public static DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values());
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
return disNodesArr;
}

public static Map<ShardId, String> prepareRequestMap(String[] indices, int shardCount) {
Map<ShardId, String> shardIdCustomDataPathMap = new HashMap<>();
for (String indexName : indices) {
final Index index = resolveIndex(indexName);
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(
client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName)
);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
for (int shardIdNum = 0; shardIdNum < shardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdCustomDataPathMap.put(shardId, customDataPath);
}
}
return shardIdCustomDataPathMap;
}

public static void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
Files.delete(item);
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;

public class TransportNodesListGatewayStartedBatchShardsIT extends OpenSearchIntegTestCase {

public void testSingleShardFetch() throws Exception {
String indexName = "test";
Map<ShardId, String> shardIdCustomDataPathMap = prepareRequestMap(new String[] { indexName }, 1);

ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();

TransportNodesListGatewayStartedBatchShards.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class),
new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardIdCustomDataPathMap)
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards nodeGatewayStartedShards = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}

public void testShardFetchMultiNodeMultiIndexes() throws Exception {
// start second node
internalCluster().startNode();
String indexName1 = "test1";
String indexName2 = "test2";
// assign one primary shard each to the data nodes
Map<ShardId, String> shardIdCustomDataPathMap = prepareRequestMap(
new String[] { indexName1, indexName2 },
internalCluster().numDataNodes()
);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length);
TransportNodesListGatewayStartedBatchShards.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class),
new TransportNodesListGatewayStartedBatchShards.Request(searchShardsResponse.getNodes(), shardIdCustomDataPathMap)
);
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards nodeGatewayStartedShards = response.getNodesMap()
.get(nodeId)
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}
}

public void testShardFetchCorruptedShards() throws Exception {
String indexName = "test";
Map<ShardId, String> shardIdCustomDataPathMap = prepareRequestMap(new String[] { indexName }, 1);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
TransportNodesListGatewayStartedBatchShards.NodesGatewayStartedShardsBatch response;
internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName());
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedBatchShards.class),
new TransportNodesListGatewayStartedBatchShards.Request(getDiscoveryNodes(), shardIdCustomDataPathMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards nodeGatewayStartedShards = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values());
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
return disNodesArr;
}

private void assertNodeGatewayStartedShardsHappyCase(
TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShards nodeGatewayStartedShards
) {
assertNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
createIndex(
indexName,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
index(indexName, "type", "1");
flush(indexName);
}

private Map<ShardId, String> prepareRequestMap(String[] indices, int primaryShardCount) {
Map<ShardId, String> shardIdCustomDataPathMap = new HashMap<>();
for (String indexName : indices) {
prepareIndex(indexName, primaryShardCount);
final Index index = resolveIndex(indexName);
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(
client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName)
);
for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdCustomDataPathMap.put(shardId, customDataPath);
}
}
return shardIdCustomDataPathMap;
}

private void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
logger.info("--> deleting [{}]", item);
Files.delete(item);
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected void configure() {
bind(GatewayService.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedBatchShards.class).asEagerSingleton();
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
bind(LocalAllocateDangledIndices.class).asEagerSingleton();
}
}
Loading
Loading