Skip to content

Commit

Permalink
[Weighted Routing] Add support to allow custom string search (opensea…
Browse files Browse the repository at this point in the history
…rch-project#6335)

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
anshu1106 authored Mar 4, 2023
1 parent 7da292b commit 5afc24f
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,9 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio
}

/**
* Assert that preference based search is not allowed with strict weighted shard routing
* Assert that preference search with custom string doesn't hit a node in weighed away az
*/
public void testStrictWeightedRouting() {
public void testStrictWeightedRoutingWithCustomString() {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand All @@ -817,24 +817,40 @@ public void testStrictWeightedRouting() {
int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
int numShards = 20;
int numReplicas = 2;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);
String nodeInZoneA = nodeMap.get("a").get(0);
String customPreference = randomAlphaOfLength(10);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.get()
);
SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(20)
.setPreference(customPreference)
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());
assertNoSearchInAZ("c");
assertSearchInAZ("a");
assertSearchInAZ("b");

// disable strict weighed routing
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.weighted.strict", false))
.get();

// make search requests with custom string
searchResponse = internalCluster().client(nodeMap.get("a").get(0))
.prepareSearch()
.setSize(20)
.setPreference(customPreference)
.get();
// assert search on data nodes on az c (weighed away az)
assertSearchInAZ("c");

}

Expand Down Expand Up @@ -862,13 +878,86 @@ public void testPreferenceSearchWithWeightedRouting() {

String customPreference = randomAlphaOfLength(10);
String nodeInZoneA = nodeMap.get("a").get(0);
String nodeInZoneB = nodeMap.get("b").get(0);
String nodeInZoneC = nodeMap.get("c").get(0);

Map<String, String> nodeIDMap = new HashMap<>();
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference))
.setPreference("_local")
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());

searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference(
"_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC)
)
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());

searchResponse = internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference("_prefer_nodes:zone:a")
.get();
assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus());
}

/**
* Assert that preference based search with preference type is not allowed with strict weighted shard routing
*/
public void testStrictWeightedRouting() {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.weighted.fail_open", true)
.put("cluster.routing.weighted.strict", true)
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
setShardRoutingWeights(weights);
String nodeInZoneA = nodeMap.get("a").get(0);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_local").get()
);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference("_only_nodes:" + nodeInZoneA)
.get()
);

assertThrows(
PreferenceBasedSearchNotAllowedException.class,
() -> internalCluster().client(nodeMap.get("b").get(0))
.prepareSearch()
.setSize(0)
.setPreference("_prefer_nodes:" + nodeInZoneA)
.get()
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,7 @@ public ShardIterator activeInitializingShardsWeightedIt(
boolean isFailOpenEnabled
) {
final int seed = shufflerForWeightedRouting.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
Expand All @@ -357,8 +350,40 @@ public ShardIterator activeInitializingShardsWeightedIt(
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}
return new PlainShardIterator(shardId, ordered);
}

private List<ShardRouting> activeInitializingShardsWithWeights(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
int seed
) {
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
List<ShardRouting> orderedListWithDistinctShards;
orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList());
return new PlainShardIterator(shardId, orderedListWithDistinctShards);
return orderedListWithDistinctShards;
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted
* round-robin scheduling policy. Uses the passed seed to shuffle the shards.
*
*/
public ShardIterator activeInitializingShardsSimpleWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
int seed
) {
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);
return new PlainShardIterator(shardId, ordered);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,7 @@ private ShardIterator preferenceActiveShardIterator(
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
}
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference based routing not allowed with strict weighted shard routing setting"
);
}

if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
Expand Down Expand Up @@ -318,6 +314,11 @@ private ShardIterator preferenceActiveShardIterator(
}
}
preferenceType = Preference.parse(preference);
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(","))
Expand All @@ -343,7 +344,14 @@ private ShardIterator preferenceActiveShardIterator(
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
if (ignoreAwarenessAttributes()) {
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
return indexShard.activeInitializingShardsSimpleWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
routingHash
);
} else if (ignoreAwarenessAttributes()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,4 +860,76 @@ public void testWeightedRoutingShardStateWithDifferentWeights() {
terminate(threadPool);
}
}

/**
* Test to validate that simple weighted shard routing with seed return same shard routing on each call
*/
public void testActiveInitializingShardsSimpleWeightedIt() {
TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsSimpleWeightedIt");
try {
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.awareness.attributes", "zone");
AllocationService strategy = createAllocationService(settings.build());

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();

ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);

Map<String, String> node1Attributes = new HashMap<>();
node1Attributes.put("zone", "zone1");
Map<String, String> node2Attributes = new HashMap<>();
node2Attributes.put("zone", "zone2");
Map<String, String> node3Attributes = new HashMap<>();
node3Attributes.put("zone", "zone3");
clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("node1", unmodifiableMap(node1Attributes)))
.add(newNode("node2", unmodifiableMap(node2Attributes)))
.add(newNode("node3", unmodifiableMap(node3Attributes)))
.localNodeId("node1")
)
.build();
clusterState = strategy.reroute(clusterState, "reroute");

clusterState = startInitializingShardsAndReroute(strategy, clusterState);
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
List<Map<String, Double>> weightsList = new ArrayList<>();
Map<String, Double> weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0);
weightsList.add(weights1);

WeightedRouting weightedRouting = new WeightedRouting("zone", weights1);
ShardIterator shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1);

ShardRouting shardRouting1 = shardIterator.nextOrNull();

for (int i = 0; i < 50; i++) {

shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1);

ShardRouting shardRouting2 = shardIterator.nextOrNull();

assertEquals(shardRouting1.currentNodeId(), shardRouting2.currentNodeId());
}

} finally {
terminate(threadPool);
}
}
}

0 comments on commit 5afc24f

Please sign in to comment.