Skip to content

Commit

Permalink
refactor service for improving multithreading efficiency
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Feb 5, 2024
1 parent 7d2b3ab commit f211425
Show file tree
Hide file tree
Showing 19 changed files with 697 additions and 651 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);

Expand All @@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() {
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

// Sleep to wait for queue drained to top queries store
Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
/**
* Test get top queries with small top n size
*/
public void testGetTopQueriesWithSmallTopN() {
public void testGetTopQueriesWithSmallTopN() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1")
Expand Down Expand Up @@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() {
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Expand All @@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() {
/**
* Test get top queries with small window size
*/
public void testGetTopQueriesWithSmallWindowSize() {
public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
Expand Down Expand Up @@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() {
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());

Thread.sleep(6000);
internalCluster().stopAllNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,25 @@ public QueryInsightsPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
// create top n queries service
QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
return List.of(
new ScalingExecutorBuilder(
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
Expand All @@ -88,13 +88,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
final Settings settings,
final RestController restController,
final ClusterSettings clusterSettings,
final IndexScopedSettings indexScopedSettings,
final SettingsFilter settingsFilter,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestTopQueriesAction());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

Expand All @@ -34,7 +33,9 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

/**
* The listener for top N queries by latency
* The listener for query insights services.
* It forwards query-related data to the appropriate query insights stores,
* either for each request or for each phase.
*
* @opensearch.internal
*/
Expand All @@ -52,47 +53,55 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.queryInsightsService::setTopNSize,
this.queryInsightsService::validateTopNSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.queryInsightsService::setWindowSize,
this.queryInsightsService::validateWindowSize
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
);
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

/**
* Enable or disable metric collection for {@link MetricType}
* Enable or disable top queries insights collection for {@link MetricType}
* This function will enable or disable the corresponding listeners
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
*/
public void setEnabled(MetricType metricType, boolean enabled) {
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
this.queryInsightsService.enableCollection(metricType, enabled);

// disable QueryInsightsListener only if collection for all metrics are disabled.
if (!enabled) {
for (MetricType t : MetricType.allMetricTypes()) {
if (this.queryInsightsService.isCollectionEnabled(t)) {
return;
}
// disable QueryInsightsListener only if all metrics collections are disabled now.
if (!queryInsightsService.isEnabled()) {
super.setEnabled(false);
this.queryInsightsService.stop();
}
super.setEnabled(false);
} else {
super.setEnabled(true);
// restart QueryInsightsListener only if none of metrics collections is enabled before.
if (isAllMetricsDisabled) {
this.queryInsightsService.stop();
this.queryInsightsService.start();
}
}

}

@Override
Expand All @@ -113,17 +122,14 @@ public void onPhaseFailure(SearchPhaseContext context) {}
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
SearchRequest request = context.getRequest();
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
final SearchRequest request = context.getRequest();
try {
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
Map<MetricType, Number> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
new Measurement<>(
MetricType.LATENCY.name(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
)
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
);
}
Map<Attribute, Object> attributes = new HashMap<>();
Expand Down
Loading

0 comments on commit f211425

Please sign in to comment.