Skip to content

Commit

Permalink
Query grouping framework for Top N queries and group by query similar…
Browse files Browse the repository at this point in the history
…ity (#66)

* Query grouping framework and group by query similarity

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Spotless apply

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Build fix

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Properly configure settings update consumer

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Address review comments

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Refactor unit tests

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Decouple Measurement and MetricType

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Aggregate type NONE will ensure no aggregations computed

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Perform renaming

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Integrate query shape library with grouping

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Spotless

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Create and consume string hashcode interface

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Health checks in code

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Fix tests and spotless apply

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Minor fixes

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Max groups setting and unit tests

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Address review comments

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Address review comments

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Create query grouper interface and top query store interface

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Address review comments

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Removed unused interface

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Rebase main and spotless

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Renaming variable

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Remove TopQueriesStore interface

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Drain top queries service on group change

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Rename max groups setting and allow minimum 0

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Make write/read from io backword compatible

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Minor fix

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

* Refactor query grouper

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>

---------

Signed-off-by: Siddhant Deshmukh <deshsid@amazon.com>
(cherry picked from commit 65e4489)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 5, 2024
1 parent 98055c1 commit bba2b66
Show file tree
Hide file tree
Showing 23 changed files with 1,808 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugin.insights.core.listener;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;
Expand All @@ -31,7 +33,9 @@
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -101,6 +105,26 @@ public QueryInsightsListener(
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}

// Settings endpoints set for grouping top n queries
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_GROUP_BY,
v -> this.queryInsightsService.setGrouping(v),
v -> this.queryInsightsService.validateGrouping(v)
);
this.queryInsightsService.validateGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));
this.queryInsightsService.setGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
v -> this.queryInsightsService.setMaximumGroups(v),
v -> this.queryInsightsService.validateMaximumGroups(v)
);
this.queryInsightsService.validateMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));
this.queryInsightsService.setMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));

// Settings endpoints set for search query metrics
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v));
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
Expand Down Expand Up @@ -191,32 +215,40 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Map<MetricType, Measurement> measurements = new HashMap<>();
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
new Measurement(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()))
);
}
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
)
);
}
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
)
);
}

String hashcode = QueryShapeGenerator.getShapeHashCodeAsString(request.source(), false);

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);
attributes.put(Attribute.QUERY_HASHCODE, hashcode);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -73,6 +75,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

/**
* Flags for enabling insight data grouping for different metric types
*/
private GroupingType groupingType;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
Expand Down Expand Up @@ -112,16 +119,17 @@ public QueryInsightsService(

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
}

/**
* Ingest the query data into in-memory stores
*
* @param record the record to ingest
* @return SearchQueryRecord
* @return true/false
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = searchQueryMetricsEnabled;
boolean shouldAdd = isSearchQueryMetricsFeatureEnabled() || isGroupingEnabled();
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
Expand Down Expand Up @@ -185,6 +193,67 @@ public void enableCollection(final MetricType metricType, final boolean enable)
this.topQueriesServices.get(metricType).setEnabled(enable);
}

/**
* Validate grouping given grouping type setting
* @param groupingTypeSetting grouping setting
*/
public void validateGrouping(final String groupingTypeSetting) {
GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
}

/**
* Set grouping
* @param groupingTypeSetting grouping
*/
public void setGrouping(final String groupingTypeSetting) {
GroupingType newGroupingType = GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
GroupingType oldGroupingType = groupingType;

if (oldGroupingType != newGroupingType) {
groupingType = newGroupingType;

for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setGrouping(newGroupingType);
}
}
}

/**
* Set max number of groups
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void setMaximumGroups(final int maxGroups) {
for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setMaxGroups(maxGroups);
}
}

/**
* Validate max number of groups. Should be between 1 and MAX_GROUPS_LIMIT
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void validateMaximumGroups(final int maxGroups) {
if (maxGroups < 0 || maxGroups > QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT) {
throw new IllegalArgumentException(
"Max groups setting"
+ " should be between 0 and "
+ QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT
+ ", was ("
+ maxGroups
+ ")"
);
}
}

/**
* Get the grouping type based on the metricType
* @return GroupingType
*/

public GroupingType getGrouping() {
return groupingType;
}

/**
* Get if the Query Insights data collection is enabled for a MetricType
*
Expand Down Expand Up @@ -226,9 +295,18 @@ public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Is grouping feature enabled and TopN feature enabled
* @return boolean
*/
public boolean isGroupingEnabled() {
return this.groupingType != GroupingType.NONE && isTopNFeatureEnabled();
}

/**
* Enable/Disable search query metrics feature.
* @param enable enable/disable search query metrics feature
* Stops query insights service if no features enabled
*/
public void enableSearchQueryMetricsFeature(boolean enable) {
searchQueryMetricsEnabled = enable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,6 +35,10 @@
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.exporter.SinkType;
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -66,7 +70,7 @@ public class TopQueriesService {
/**
* The internal thread-safe store that holds the top n queries insight data
*/
private final PriorityQueue<SearchQueryRecord> topQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;

/**
* The AtomicReference of a snapshot of the current window top queries for getters to consume
Expand All @@ -93,6 +97,8 @@ public class TopQueriesService {
*/
private QueryInsightsExporter exporter;

private QueryGrouper queryGrouper;

TopQueriesService(
final MetricType metricType,
final ThreadPool threadPool,
Expand All @@ -106,9 +112,16 @@ public class TopQueriesService {
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
this.exporter = null;
topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
queryGrouper = new MinMaxHeapQueryGrouper(
metricType,
QueryInsightsSettings.DEFAULT_GROUPING_TYPE,
AggregationType.AVERAGE,
topQueriesStore,
topNSize
);
}

/**
Expand All @@ -118,6 +131,7 @@ public class TopQueriesService {
*/
public void setTopNSize(final int topNSize) {
this.topNSize = topNSize;
this.queryGrouper.updateTopNSize(topNSize);
}

/**
Expand Down Expand Up @@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) {
this.windowStart = -1L;
}

public void setGrouping(final GroupingType groupingType) {
boolean changed = queryGrouper.setGroupingType(groupingType);
if (changed) {
drain();
}
}

public void setMaxGroups(final int maxGroups) {
boolean changed = queryGrouper.setMaxGroups(maxGroups);
if (changed) {
drain();
}
}

/**
* Validate if the window size is valid, based on internal constrains.
*
Expand Down Expand Up @@ -306,10 +334,16 @@ void consumeRecords(final List<SearchQueryRecord> records) {
}

private void addToTopNStore(final List<SearchQueryRecord> records) {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
for (SearchQueryRecord record : records) {
queryGrouper.add(record);
}
} else {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
}
}
}

Expand All @@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
}
topQueriesHistorySnapshot.set(history);
topQueriesStore.clear();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
queryGrouper.drain();
}
topQueriesCurrentSnapshot.set(new ArrayList<>());
windowStart = newWindowStart;
// export to the configured sink
Expand Down Expand Up @@ -368,4 +405,13 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
}

/**
* Drain internal stores.
*/
private void drain() {
topQueriesStore.clear();
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public static MurmurHash3.Hash128 getShapeHashCode(SearchSourceBuilder source, B
return MurmurHash3.hash128(shapeBytes.bytes, 0, shapeBytes.length, 0, new MurmurHash3.Hash128());
}

public static String getShapeHashCodeAsString(SearchSourceBuilder source, Boolean showFields) {
MurmurHash3.Hash128 hashcode = getShapeHashCode(source, showFields);
String hashAsString = Long.toHexString(hashcode.h1) + Long.toHexString(hashcode.h2);
return hashAsString;
}

/**
* Method to build search query shape given a source
* @param source search request source
Expand Down
Loading

0 comments on commit bba2b66

Please sign in to comment.