From 5afc24f7852a672e3fc76cbd351ecbae796c6b33 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Sat, 4 Mar 2023 14:13:53 +0530 Subject: [PATCH] [Weighted Routing] Add support to allow custom string search (#6335) Signed-off-by: Anshu Agarwal --- .../search/SearchWeightedRoutingIT.java | 117 +++++++++++++++--- .../routing/IndexShardRoutingTable.java | 43 +++++-- .../cluster/routing/OperationRouting.java | 20 ++- .../structure/RoutingIteratorTests.java | 72 +++++++++++ 4 files changed, 223 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 02b51f9e625d3..1482e73efdace 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -803,9 +803,9 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio } /** - * Assert that preference based search is not allowed with strict weighted shard routing + * Assert that preference search with custom string doesn't hit a node in weighed away az */ - public void testStrictWeightedRouting() { + public void testStrictWeightedRoutingWithCustomString() { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") @@ -817,24 +817,40 @@ public void testStrictWeightedRouting() { int nodeCountPerAZ = 1; Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); - int numShards = 10; - int numReplicas = 1; + int numShards = 20; + int numReplicas = 2; setUpIndexing(numShards, numReplicas); logger.info("--> setting shard routing weights for weighted round robin"); Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); setShardRoutingWeights(weights); - String nodeInZoneA = nodeMap.get("a").get(0); String customPreference = randomAlphaOfLength(10); - assertThrows( - PreferenceBasedSearchNotAllowedException.class, - () -> internalCluster().client(nodeMap.get("b").get(0)) - .prepareSearch() - .setSize(0) - .setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference)) - .get() - ); + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(20) + .setPreference(customPreference) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + assertNoSearchInAZ("c"); + assertSearchInAZ("a"); + assertSearchInAZ("b"); + + // disable strict weighed routing + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put("cluster.routing.weighted.strict", false)) + .get(); + + // make search requests with custom string + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setSize(20) + .setPreference(customPreference) + .get(); + // assert search on data nodes on az c (weighed away az) + assertSearchInAZ("c"); } @@ -862,13 +878,86 @@ public void testPreferenceSearchWithWeightedRouting() { String customPreference = randomAlphaOfLength(10); String nodeInZoneA = nodeMap.get("a").get(0); + String nodeInZoneB = nodeMap.get("b").get(0); + String nodeInZoneC = nodeMap.get("c").get(0); + + Map nodeIDMap = new HashMap<>(); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() .setSize(0) - .setPreference(randomFrom("_local", "_only_nodes:" + nodeInZoneA, "_prefer_nodes:" + nodeInZoneA, customPreference)) + .setPreference("_local") + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference( + "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) + ) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_prefer_nodes:zone:a") + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + } + + /** + * Assert that preference based search with preference type is not allowed with strict weighted shard routing + */ + public void testStrictWeightedRouting() { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + String nodeInZoneA = nodeMap.get("a").get(0); + + assertThrows( + PreferenceBasedSearchNotAllowedException.class, + () -> internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_local").get() + ); + + assertThrows( + PreferenceBasedSearchNotAllowedException.class, + () -> internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_only_nodes:" + nodeInZoneA) + .get() + ); + + assertThrows( + PreferenceBasedSearchNotAllowedException.class, + () -> internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_prefer_nodes:" + nodeInZoneA) + .get() + ); + } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index f730a2833fd02..6a877838ece95 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -327,14 +327,7 @@ public ShardIterator activeInitializingShardsWeightedIt( boolean isFailOpenEnabled ) { final int seed = shufflerForWeightedRouting.nextSeed(); - List ordered = new ArrayList<>(); - List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); - List orderedListWithDistinctShards; - ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed)); - if (!allInitializingShards.isEmpty()) { - List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); - ordered.addAll(orderedInitializingShards); - } + List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); // append shards for attribute value with weight zero, so that shard search requests can be tried on // shard copies in case of request failure from other attribute values. @@ -357,8 +350,40 @@ public ShardIterator activeInitializingShardsWeightedIt( logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); } } + return new PlainShardIterator(shardId, ordered); + } + + private List activeInitializingShardsWithWeights( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + int seed + ) { + List ordered = new ArrayList<>(); + List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed)); + if (!allInitializingShards.isEmpty()) { + List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(orderedInitializingShards); + } + List orderedListWithDistinctShards; orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList()); - return new PlainShardIterator(shardId, orderedListWithDistinctShards); + return orderedListWithDistinctShards; + } + + /** + * Returns an iterator over active and initializing shards, shards are ordered by weighted + * round-robin scheduling policy. Uses the passed seed to shuffle the shards. + * + */ + public ShardIterator activeInitializingShardsSimpleWeightedIt( + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight, + int seed + ) { + List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); + return new PlainShardIterator(shardId, ordered); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index cb20e223c9e20..b247936245151 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -281,11 +281,7 @@ private ShardIterator preferenceActiveShardIterator( if (preference == null || preference.isEmpty()) { return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); } - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { - throw new PreferenceBasedSearchNotAllowedException( - "Preference based routing not allowed with strict weighted shard routing setting" - ); - } + if (preference.charAt(0) == '_') { Preference preferenceType = Preference.parse(preference); if (preferenceType == Preference.SHARDS) { @@ -318,6 +314,11 @@ private ShardIterator preferenceActiveShardIterator( } } preferenceType = Preference.parse(preference); + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { + throw new PreferenceBasedSearchNotAllowedException( + "Preference type based routing not allowed with strict weighted shard routing enabled" + ); + } switch (preferenceType) { case PREFER_NODES: final Set nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")) @@ -343,7 +344,14 @@ private ShardIterator preferenceActiveShardIterator( // for a different element in the list by also incorporating the // shard ID into the hash of the user-supplied preference key. routingHash = 31 * routingHash + indexShard.shardId.hashCode(); - if (ignoreAwarenessAttributes()) { + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { + return indexShard.activeInitializingShardsSimpleWeightedIt( + weightedRoutingMetadata.getWeightedRouting(), + nodes, + getWeightedRoutingDefaultWeight(), + routingHash + ); + } else if (ignoreAwarenessAttributes()) { return indexShard.activeInitializingShardsIt(routingHash); } else { return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 9715d3af09fc7..866939e6ac3d7 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -860,4 +860,76 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { terminate(threadPool); } } + + /** + * Test to validate that simple weighted shard routing with seed return same shard routing on each call + */ + public void testActiveInitializingShardsSimpleWeightedIt() { + TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsSimpleWeightedIt"); + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + List> weightsList = new ArrayList<>(); + Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); + weightsList.add(weights1); + + WeightedRouting weightedRouting = new WeightedRouting("zone", weights1); + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + + ShardRouting shardRouting1 = shardIterator.nextOrNull(); + + for (int i = 0; i < 50; i++) { + + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + + ShardRouting shardRouting2 = shardIterator.nextOrNull(); + + assertEquals(shardRouting1.currentNodeId(), shardRouting2.currentNodeId()); + } + + } finally { + terminate(threadPool); + } + } }