Skip to content

Commit

Permalink
Refactor record and service to make them generic
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Feb 2, 2024
1 parent 081340c commit cd805a2
Show file tree
Hide file tree
Showing 36 changed files with 1,287 additions and 2,137 deletions.
2 changes: 0 additions & 2 deletions plugins/query-insights/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
apply plugin: 'opensearch.java-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'OpenSearch Query Insights Plugin.'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -28,6 +30,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -74,15 +77,69 @@ public void testQueryInsightPluginInstalled() {
* Test get top queries when feature disabled
*/
public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled.",
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
response.failures().get(0).getCause().getCause().getMessage()
);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet();
Assert.assertEquals(0, response2.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size());
for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) {
Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size());
}

internalCluster().stopAllNodes();
}

/**
* Test get top queries when feature enabled
*/
Expand All @@ -93,7 +150,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -118,11 +175,11 @@ public void testGetTopQueriesWhenFeatureEnabled() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -137,7 +194,7 @@ public void testGetTopQueriesWithSmallTopN() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -162,12 +219,11 @@ public void testGetTopQueriesWithSmallTopN() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
// TODO: this should be 1 after changing to cluster level top N.
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -179,10 +235,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -207,11 +263,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());

internalCluster().stopAllNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.insights.core.listener.SearchQueryLatencyListener;
import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService;
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
Expand All @@ -35,6 +37,8 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -66,10 +70,20 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client);
// top n queries listener
SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(clusterService, topQueriesByLatencyService);
return List.of(topQueriesByLatencyService, searchQueryLatencyListener);
QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return List.of(

Check warning on line 79 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java#L79

Added line #L79 was not covered by tests
new ScalingExecutorBuilder(
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
1,
Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT),
TimeValue.timeValueMinutes(5)

Check warning on line 84 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java#L83-L84

Added lines #L83 - L84 were not covered by tests
)
);
}

@Override
Expand All @@ -96,11 +110,7 @@ public List<Setting<?>> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit cd805a2

Please sign in to comment.