From fb6afb2f7c051c1f08b6d4fe663679888b0b9e79 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 5 Sep 2024 12:46:40 -0700 Subject: [PATCH] Query grouping framework for Top N queries and group by query similarity (#66) (#104) * Query grouping framework and group by query similarity * Spotless apply * Build fix * Properly configure settings update consumer * Address review comments * Refactor unit tests * Decouple Measurement and MetricType * Aggregate type NONE will ensure no aggregations computed * Perform renaming * Integrate query shape library with grouping * Spotless * Create and consume string hashcode interface * Health checks in code * Fix tests and spotless apply * Minor fixes * Max groups setting and unit tests * Address review comments * Address review comments * Create query grouper interface and top query store interface * Address review comments * Removed unused interface * Rebase main and spotless * Renaming variable * Remove TopQueriesStore interface * Drain top queries service on group change * Rename max groups setting and allow minimum 0 * Make write/read from io backword compatible * Minor fix * Refactor query grouper --------- (cherry picked from commit 65e4489e782738609f728a43d09f4444331dcee4) Signed-off-by: Siddhant Deshmukh Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../plugin/insights/QueryInsightsPlugin.java | 2 + .../core/listener/QueryInsightsListener.java | 40 +- .../core/service/QueryInsightsService.java | 82 ++- .../core/service/TopQueriesService.java | 60 +- .../categorizer/QueryShapeGenerator.java | 6 + .../SearchQueryAggregationCategorizer.java | 5 +- .../categorizer/SearchQueryCategorizer.java | 9 +- .../SearchQueryCategorizingVisitor.java | 7 +- .../categorizer/SearchQueryCounters.java | 15 +- .../grouper/MinMaxHeapQueryGrouper.java | 276 +++++++ .../core/service/grouper/QueryGrouper.java | 60 ++ .../insights/rules/model/AggregationType.java | 20 + .../insights/rules/model/Attribute.java | 6 +- .../insights/rules/model/GroupingType.java | 56 ++ .../insights/rules/model/Measurement.java | 220 ++++++ .../rules/model/SearchQueryRecord.java | 74 +- .../settings/QueryInsightsSettings.java | 26 + .../insights/QueryInsightsPluginTests.java | 2 + .../insights/QueryInsightsTestUtils.java | 104 ++- .../service/QueryInsightsServiceTests.java | 69 ++ .../core/service/TopQueriesServiceTests.java | 36 + .../grouper/MinMaxHeapQueryGrouperTests.java | 685 ++++++++++++++++++ .../top_queries/TopQueriesResponseTests.java | 2 +- 23 files changed, 1808 insertions(+), 54 deletions(-) create mode 100644 src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java create mode 100644 src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/AggregationType.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/GroupingType.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java create mode 100644 src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 78ab22e..58e835f 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -130,6 +130,8 @@ public List> getSettings() { QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, + QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, + QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING ); } diff --git a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 00bf997..0cd8412 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -9,6 +9,8 @@ package org.opensearch.plugin.insights.core.listener; import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting; @@ -31,7 +33,9 @@ import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.tasks.Task; @@ -101,6 +105,26 @@ public QueryInsightsListener( this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type))); } + // Settings endpoints set for grouping top n queries + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_QUERIES_GROUP_BY, + v -> this.queryInsightsService.setGrouping(v), + v -> this.queryInsightsService.validateGrouping(v) + ); + this.queryInsightsService.validateGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY)); + this.queryInsightsService.setGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY)); + + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, + v -> this.queryInsightsService.setMaximumGroups(v), + v -> this.queryInsightsService.validateMaximumGroups(v) + ); + this.queryInsightsService.validateMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N)); + this.queryInsightsService.setMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N)); + + // Settings endpoints set for search query metrics clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v)); setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING)); @@ -191,25 +215,32 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final final SearchRequest request = context.getRequest(); try { - Map measurements = new HashMap<>(); + Map measurements = new HashMap<>(); if (shouldCollect(MetricType.LATENCY)) { measurements.put( MetricType.LATENCY, - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) + new Measurement(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())) ); } if (shouldCollect(MetricType.CPU)) { measurements.put( MetricType.CPU, - tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum() + new Measurement( + tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum() + ) ); } if (shouldCollect(MetricType.MEMORY)) { measurements.put( MetricType.MEMORY, - tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum() + new Measurement( + tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum() + ) ); } + + String hashcode = QueryShapeGenerator.getShapeHashCodeAsString(request.source(), false); + Map attributes = new HashMap<>(); attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT)); attributes.put(Attribute.SOURCE, request.source()); @@ -217,6 +248,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages); + attributes.put(Attribute.QUERY_HASHCODE, hashcode); Map labels = new HashMap<>(); // Retrieve user provided label if exists diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 75dea9e..2b41856 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,6 +8,7 @@ package org.opensearch.plugin.insights.core.service; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; import java.io.IOException; @@ -27,6 +28,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer; +import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; @@ -73,6 +75,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ final QueryInsightsExporterFactory queryInsightsExporterFactory; + /** + * Flags for enabling insight data grouping for different metric types + */ + private GroupingType groupingType; + private volatile boolean searchQueryMetricsEnabled; private SearchQueryCategorizer searchQueryCategorizer; @@ -112,16 +119,17 @@ public QueryInsightsService( this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry); this.enableSearchQueryMetricsFeature(false); + this.groupingType = DEFAULT_GROUPING_TYPE; } /** * Ingest the query data into in-memory stores * * @param record the record to ingest - * @return SearchQueryRecord + * @return true/false */ public boolean addRecord(final SearchQueryRecord record) { - boolean shouldAdd = searchQueryMetricsEnabled; + boolean shouldAdd = isSearchQueryMetricsFeatureEnabled() || isGroupingEnabled(); if (!shouldAdd) { for (Map.Entry entry : topQueriesServices.entrySet()) { if (!enableCollect.get(entry.getKey())) { @@ -185,6 +193,67 @@ public void enableCollection(final MetricType metricType, final boolean enable) this.topQueriesServices.get(metricType).setEnabled(enable); } + /** + * Validate grouping given grouping type setting + * @param groupingTypeSetting grouping setting + */ + public void validateGrouping(final String groupingTypeSetting) { + GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting); + } + + /** + * Set grouping + * @param groupingTypeSetting grouping + */ + public void setGrouping(final String groupingTypeSetting) { + GroupingType newGroupingType = GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting); + GroupingType oldGroupingType = groupingType; + + if (oldGroupingType != newGroupingType) { + groupingType = newGroupingType; + + for (MetricType metricType : MetricType.allMetricTypes()) { + this.topQueriesServices.get(metricType).setGrouping(newGroupingType); + } + } + } + + /** + * Set max number of groups + * @param maxGroups maximum number of groups that should be tracked when calculating Top N groups + */ + public void setMaximumGroups(final int maxGroups) { + for (MetricType metricType : MetricType.allMetricTypes()) { + this.topQueriesServices.get(metricType).setMaxGroups(maxGroups); + } + } + + /** + * Validate max number of groups. Should be between 1 and MAX_GROUPS_LIMIT + * @param maxGroups maximum number of groups that should be tracked when calculating Top N groups + */ + public void validateMaximumGroups(final int maxGroups) { + if (maxGroups < 0 || maxGroups > QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT) { + throw new IllegalArgumentException( + "Max groups setting" + + " should be between 0 and " + + QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT + + ", was (" + + maxGroups + + ")" + ); + } + } + + /** + * Get the grouping type based on the metricType + * @return GroupingType + */ + + public GroupingType getGrouping() { + return groupingType; + } + /** * Get if the Query Insights data collection is enabled for a MetricType * @@ -226,9 +295,18 @@ public boolean isSearchQueryMetricsFeatureEnabled() { return this.searchQueryMetricsEnabled; } + /** + * Is grouping feature enabled and TopN feature enabled + * @return boolean + */ + public boolean isGroupingEnabled() { + return this.groupingType != GroupingType.NONE && isTopNFeatureEnabled(); + } + /** * Enable/Disable search query metrics feature. * @param enable enable/disable search query metrics feature + * Stops query insights service if no features enabled */ public void enableSearchQueryMetricsFeature(boolean enable) { searchQueryMetricsEnabled = enable; diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index b79e4e7..9bd2dd0 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -24,7 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.Locale; -import java.util.PriorityQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,6 +35,10 @@ import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; import org.opensearch.plugin.insights.core.exporter.SinkType; +import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper; +import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper; +import org.opensearch.plugin.insights.rules.model.AggregationType; +import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; @@ -66,7 +70,7 @@ public class TopQueriesService { /** * The internal thread-safe store that holds the top n queries insight data */ - private final PriorityQueue topQueriesStore; + private final PriorityBlockingQueue topQueriesStore; /** * The AtomicReference of a snapshot of the current window top queries for getters to consume @@ -93,6 +97,8 @@ public class TopQueriesService { */ private QueryInsightsExporter exporter; + private QueryGrouper queryGrouper; + TopQueriesService( final MetricType metricType, final ThreadPool threadPool, @@ -106,9 +112,16 @@ public class TopQueriesService { this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; this.windowStart = -1L; this.exporter = null; - topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); + queryGrouper = new MinMaxHeapQueryGrouper( + metricType, + QueryInsightsSettings.DEFAULT_GROUPING_TYPE, + AggregationType.AVERAGE, + topQueriesStore, + topNSize + ); } /** @@ -118,6 +131,7 @@ public class TopQueriesService { */ public void setTopNSize(final int topNSize) { this.topNSize = topNSize; + this.queryGrouper.updateTopNSize(topNSize); } /** @@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) { this.windowStart = -1L; } + public void setGrouping(final GroupingType groupingType) { + boolean changed = queryGrouper.setGroupingType(groupingType); + if (changed) { + drain(); + } + } + + public void setMaxGroups(final int maxGroups) { + boolean changed = queryGrouper.setMaxGroups(maxGroups); + if (changed) { + drain(); + } + } + /** * Validate if the window size is valid, based on internal constrains. * @@ -306,10 +334,16 @@ void consumeRecords(final List records) { } private void addToTopNStore(final List records) { - topQueriesStore.addAll(records); - // remove top elements for fix sizing priority queue - while (topQueriesStore.size() > topNSize) { - topQueriesStore.poll(); + if (queryGrouper.getGroupingType() != GroupingType.NONE) { + for (SearchQueryRecord record : records) { + queryGrouper.add(record); + } + } else { + topQueriesStore.addAll(records); + // remove top elements for fix sizing priority queue + while (topQueriesStore.size() > topNSize) { + topQueriesStore.poll(); + } } } @@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) { } topQueriesHistorySnapshot.set(history); topQueriesStore.clear(); + if (queryGrouper.getGroupingType() != GroupingType.NONE) { + queryGrouper.drain(); + } topQueriesCurrentSnapshot.set(new ArrayList<>()); windowStart = newWindowStart; // export to the configured sink @@ -368,4 +405,13 @@ public List getTopQueriesCurrentSnapshot() { public void close() throws IOException { queryInsightsExporterFactory.closeExporter(this.exporter); } + + /** + * Drain internal stores. + */ + private void drain() { + topQueriesStore.clear(); + topQueriesHistorySnapshot.set(new ArrayList<>()); + topQueriesCurrentSnapshot.set(new ArrayList<>()); + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/QueryShapeGenerator.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/QueryShapeGenerator.java index aedc634..5664f3c 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/QueryShapeGenerator.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/QueryShapeGenerator.java @@ -90,6 +90,12 @@ public static MurmurHash3.Hash128 getShapeHashCode(SearchSourceBuilder source, B return MurmurHash3.hash128(shapeBytes.bytes, 0, shapeBytes.length, 0, new MurmurHash3.Hash128()); } + public static String getShapeHashCodeAsString(SearchSourceBuilder source, Boolean showFields) { + MurmurHash3.Hash128 hashcode = getShapeHashCode(source, showFields); + String hashAsString = Long.toHexString(hashcode.h1) + Long.toHexString(hashcode.h2); + return hashAsString; + } + /** * Method to build search query shape given a source * @param source search request source diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java index 7ed861f..534d067 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryAggregationCategorizer.java @@ -10,6 +10,7 @@ import java.util.Collection; import java.util.Map; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.PipelineAggregationBuilder; @@ -39,14 +40,14 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters */ public void incrementSearchQueryAggregationCounters( Collection aggregatorFactories, - Map measurements + Map measurements ) { for (AggregationBuilder aggregationBuilder : aggregatorFactories) { incrementCountersRecursively(aggregationBuilder, measurements); } } - private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map measurements) { + private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map measurements) { // Increment counters for the current aggregation String aggregationType = aggregationBuilder.getType(); searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(AGGREGATION_TYPE_TAG, aggregationType), measurements); diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java index af89a37..df3cb7b 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizer.java @@ -15,6 +15,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilderVisitor; import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.search.aggregations.AggregatorFactories; @@ -81,7 +82,7 @@ public void consumeRecords(List records) { */ public void categorize(SearchQueryRecord record) { SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE); - Map measurements = record.getMeasurements(); + Map measurements = record.getMeasurements(); incrementQueryTypeCounters(source.query(), measurements); incrementQueryAggregationCounters(source.aggregations(), measurements); @@ -93,7 +94,7 @@ public void categorize(SearchQueryRecord record) { } } - private void incrementQuerySortCounters(List> sorts, Map measurements) { + private void incrementQuerySortCounters(List> sorts, Map measurements) { if (sorts != null && sorts.size() > 0) { for (SortBuilder sortBuilder : sorts) { String sortOrder = sortBuilder.order().toString(); @@ -102,7 +103,7 @@ private void incrementQuerySortCounters(List> sorts, Map measurements) { + private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations, Map measurements) { if (aggregations == null) { return; } @@ -110,7 +111,7 @@ private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggre searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories(), measurements); } - private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map measurements) { + private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map measurements) { if (topLevelQueryBuilder == null) { return; } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizingVisitor.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizingVisitor.java index f6addca..98c9bf6 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizingVisitor.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCategorizingVisitor.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.BooleanClause; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilderVisitor; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; /** @@ -21,13 +22,13 @@ final class SearchQueryCategorizingVisitor implements QueryBuilderVisitor { private final int level; private final SearchQueryCounters searchQueryCounters; - private final Map measurements; + private final Map measurements; - public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters, Map measurements) { + public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters, Map measurements) { this(searchQueryCounters, 0, measurements); } - private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level, Map measurements) { + private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level, Map measurements) { this.searchQueryCounters = counters; this.level = level; this.measurements = measurements; diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java index cb89022..21f4aae 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/categorizer/SearchQueryCounters.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.opensearch.index.query.QueryBuilder; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; @@ -108,7 +109,7 @@ public SearchQueryCounters(MetricsRegistry metricsRegistry) { * @param level level of query builder, 0 being highest level * @param measurements metrics measurements */ - public void incrementCounter(QueryBuilder queryBuilder, int level, Map measurements) { + public void incrementCounter(QueryBuilder queryBuilder, int level, Map measurements) { String uniqueQueryCounterName = queryBuilder.getName(); Counter counter = nameToQueryTypeCounters.computeIfAbsent(uniqueQueryCounterName, k -> createQueryCounter(k)); @@ -122,7 +123,7 @@ public void incrementCounter(QueryBuilder queryBuilder, int level, Map measurements) { + public void incrementAggCounter(double value, Tags tags, Map measurements) { aggCounter.add(value, tags); incrementAllHistograms(tags, measurements); } @@ -133,15 +134,15 @@ public void incrementAggCounter(double value, Tags tags, Map * @param tags tags * @param measurements metrics measurements */ - public void incrementSortCounter(double value, Tags tags, Map measurements) { + public void incrementSortCounter(double value, Tags tags, Map measurements) { sortCounter.add(value, tags); incrementAllHistograms(tags, measurements); } - private void incrementAllHistograms(Tags tags, Map measurements) { - queryTypeLatencyHistogram.record(measurements.get(MetricType.LATENCY).doubleValue(), tags); - queryTypeCpuHistogram.record(measurements.get(MetricType.CPU).doubleValue(), tags); - queryTypeMemoryHistogram.record(measurements.get(MetricType.MEMORY).doubleValue(), tags); + private void incrementAllHistograms(Tags tags, Map measurements) { + queryTypeLatencyHistogram.record(measurements.get(MetricType.LATENCY).getMeasurement().doubleValue(), tags); + queryTypeCpuHistogram.record(measurements.get(MetricType.CPU).getMeasurement().doubleValue(), tags); + queryTypeMemoryHistogram.record(measurements.get(MetricType.MEMORY).getMeasurement().doubleValue(), tags); } /** diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java new file mode 100644 index 0000000..9b87f2d --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java @@ -0,0 +1,276 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service.grouper; + +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; +import org.opensearch.plugin.insights.rules.model.AggregationType; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.GroupingType; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +/** + * Handles grouping of search queries based on the GroupingType for the MetricType + * Following algorithm : https://github.com/opensearch-project/OpenSearch/issues/13357#issuecomment-2269706425 + */ +public class MinMaxHeapQueryGrouper implements QueryGrouper { + + /** + * Logger + */ + private static final Logger log = LogManager.getLogger(MinMaxHeapQueryGrouper.class); + /** + * Grouping type for the current grouping service + */ + private volatile GroupingType groupingType; + /** + * Metric type for the current grouping service + */ + private MetricType metricType; + + /** + * Aggregation type for the current grouping service + */ + private AggregationType aggregationType; + /** + * Map storing groupingId to Tuple containing Aggregate search query record and boolean. + * SearchQueryRecord: Aggregate search query record to store the aggregate of a metric type based on the aggregation type.. + * Example: Average latency. This query record will be used to store the average latency for multiple query records + * in this case. + * boolean: True if the aggregate record is in the Top N queries priority query (min heap) and False if the aggregate + * record is in the Max Heap + */ + private ConcurrentHashMap> groupIdToAggSearchQueryRecord; + /** + * Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore + */ + private PriorityBlockingQueue minHeapTopQueriesStore; + /** + * The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap. + * It stores all records not included in the Top N query results. When the aggregate measurement for one of these + * records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap, + * and the records are rearranged accordingly. + */ + private PriorityBlockingQueue maxHeapQueryStore; + + /** + * Top N size based on the configuration set + */ + private int topNSize; + + /** + * To keep track of Top N groups we need to store details of all the groups encountered in the window. + * This value can be arbitrarily large and we need to limit this. + * Following is the maximum number of groups that should be tracked when calculating Top N groups and we have a + * cluster setting to configure. + */ + private int maxGroups; + + public MinMaxHeapQueryGrouper( + MetricType metricType, + GroupingType groupingType, + AggregationType aggregationType, + PriorityBlockingQueue topQueriesStore, + int topNSize + ) { + this.groupingType = groupingType; + this.metricType = metricType; + this.aggregationType = aggregationType; + this.groupIdToAggSearchQueryRecord = new ConcurrentHashMap<>(); + this.minHeapTopQueriesStore = topQueriesStore; + this.topNSize = topNSize; + this.maxGroups = QueryInsightsSettings.DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT; + this.maxHeapQueryStore = new PriorityBlockingQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType)); + } + + /** + * Add query to the group based on the GroupType setting. + * The grouping of metrics will be stored within the searchQueryRecord. + * @param searchQueryRecord record + * @return return the search query record that represents the group + */ + @Override + public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) { + if (groupingType == GroupingType.NONE) { + throw new IllegalArgumentException("Do not use addQueryToGroup when GroupingType is None"); + } + SearchQueryRecord aggregateSearchQueryRecord; + String groupId = getGroupingId(searchQueryRecord); + + // 1) New group added to the grouping service + // Add to min PQ and overflow records to max PQ (if the number of records in the min PQ exceeds the configured size N) + // 2) Existing group being updated to the grouping service + // a. If present in min PQ + // - remove the record from the min PQ + // b. If present in max PQ + // - remove the record from the max PQ + // Add to min PQ and promote to max + // If max PQ is empty return else try to promote record from max to min + if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) { + boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId); + if (maxGroupsLimitReached) { + return null; + } + aggregateSearchQueryRecord = searchQueryRecord; + aggregateSearchQueryRecord.setGroupingId(groupId); + aggregateSearchQueryRecord.setMeasurementAggregation(metricType, aggregationType); + addToMinPQ(aggregateSearchQueryRecord, groupId); + } else { + aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1(); + boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2(); + if (isPresentInMinPQ) { + minHeapTopQueriesStore.remove(aggregateSearchQueryRecord); + } else { + maxHeapQueryStore.remove(aggregateSearchQueryRecord); + } + addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId); + } + return aggregateSearchQueryRecord; + } + + /** + * Drain the internal grouping. Needs to be performed after every window or if a setting is changed. + */ + @Override + public void drain() { + log.debug("Number of groups for the current window is " + numberOfGroups()); + groupIdToAggSearchQueryRecord.clear(); + maxHeapQueryStore.clear(); + minHeapTopQueriesStore.clear(); + } + + /** + * Set Grouping Type + * @param newGroupingType grouping type + * @return grouping type changed + */ + @Override + public boolean setGroupingType(GroupingType newGroupingType) { + if (this.groupingType != newGroupingType) { + this.groupingType = newGroupingType; + drain(); + return true; + } + return false; + } + + /** + * Get Grouping Type + * @return grouping type + */ + @Override + public GroupingType getGroupingType() { + return groupingType; + } + + /** + * Set the maximum number of groups that should be tracked when calculating Top N groups. + * If the value changes, reset the state of the query grouper service by draining all internal data. + * @param maxGroups max number of groups + * @return max groups changed + */ + @Override + public boolean setMaxGroups(int maxGroups) { + if (this.maxGroups != maxGroups) { + this.maxGroups = maxGroups; + drain(); + return true; + } + return false; + } + + /** + * Update Top N size + * @param newSize new size + */ + @Override + public void updateTopNSize(int newSize) { + this.topNSize = newSize; + } + + private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) { + minHeapTopQueriesStore.add(searchQueryRecord); + groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true)); + overflow(); + } + + private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) { + Number measurementToAdd = searchQueryRecord.getMeasurement(metricType); + aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd); + addToMinPQ(aggregateSearchQueryRecord, groupId); + if (maxHeapQueryStore.isEmpty()) { + return; + } + if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) { + SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll(); + addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId()); + } + } + + private void overflow() { + if (minHeapTopQueriesStore.size() > topNSize) { + SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll(); + maxHeapQueryStore.add(recordMovedFromMinToMax); + groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false)); + } + } + + private boolean checkMaxGroupsLimitReached(String groupId) { + if (maxGroups <= maxHeapQueryStore.size() && minHeapTopQueriesStore.size() >= topNSize) { + log.warn( + "Exceeded [{}] setting threshold which is set at {}. Discarding new group with id {}.", + TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N.getKey(), + maxGroups, + groupId + ); + return true; + } + return false; + } + + /** + * Gives the number of groups as part of the current grouping. + * @return number of groups + */ + + int numberOfGroups() { + return groupIdToAggSearchQueryRecord.size(); + } + + /** + * Gives the number of groups that are part of the top groups + * @return number of top groups + */ + int numberOfTopGroups() { + return minHeapTopQueriesStore.size(); + } + + /** + * Get groupingId. This should be query hashcode for SIMILARITY grouping and user_id for USER_ID grouping. + * @param searchQueryRecord record + * @return Grouping Id + */ + private String getGroupingId(SearchQueryRecord searchQueryRecord) { + switch (groupingType) { + case SIMILARITY: + return searchQueryRecord.getAttributes().get(Attribute.QUERY_HASHCODE).toString(); + case NONE: + throw new IllegalArgumentException("Should not try to group queries if grouping type is NONE"); + default: + throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType); + } + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java new file mode 100644 index 0000000..07ba769 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service.grouper; + +import org.opensearch.plugin.insights.rules.model.GroupingType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +/** + * Interface for grouping search queries based on grouping type for the metric type. + */ +public interface QueryGrouper { + + /** + * Add query to the group based on the GroupType setting. + * @param searchQueryRecord record to be added + * @return the aggregate search query record representing the group + */ + SearchQueryRecord add(SearchQueryRecord searchQueryRecord); + + /** + * Drain the internal grouping. Needs to be performed after every window or if a setting is changed. + */ + void drain(); + + /** + * Set the grouping type for this grouper. + * + * @param groupingType the grouping type to set + * @return grouping type changed + */ + boolean setGroupingType(GroupingType groupingType); + + /** + * Get the current grouping type for this grouper. + * + * @return the current grouping type + */ + GroupingType getGroupingType(); + + /** + * Set the maximum number of groups allowed. + * + * @param maxGroups the maximum number of groups + * @return max groups changed + */ + boolean setMaxGroups(int maxGroups); + + /** + * Update the top N size for the grouper. + * + * @param topNSize the new top N size + */ + void updateTopNSize(int topNSize); +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/AggregationType.java b/src/main/java/org/opensearch/plugin/insights/rules/model/AggregationType.java new file mode 100644 index 0000000..b49cc5a --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/AggregationType.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +/** + * Aggregation type for a measurement. Default is NONE and average is used for grouping Top N queries by similarity. + */ +public enum AggregationType { + NONE, + AVERAGE, + SUM; + + public static AggregationType DEFAULT_AGGREGATION_TYPE = NONE; +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index c8b14fd..8409796 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -56,7 +56,11 @@ public enum Attribute { /** * Custom search request labels */ - LABELS; + LABELS, + /** + * Unique hashcode used to group similar queries + */ + QUERY_HASHCODE; /** * Read an Attribute from a StreamInput diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/GroupingType.java b/src/main/java/org/opensearch/plugin/insights/rules/model/GroupingType.java new file mode 100644 index 0000000..318870a --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/GroupingType.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Grouping type to group the Top N queries + */ +public enum GroupingType { + NONE("none"), + SIMILARITY("similarity"); + + private final String stringValue; + + GroupingType(String stringValue) { + this.stringValue = stringValue; + } + + public String getValue() { + return stringValue; + } + + /** + * Get all valid GroupingTypes + * + * @return A set contains all valid GroupingTypes + */ + public static Set allGroupingTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Get grouping type from setting string and validate + * @param settingValue value + * @return GroupingType + */ + public static GroupingType getGroupingTypeFromSettingAndValidate(String settingValue) { + try { + return GroupingType.valueOf(settingValue.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid grouping type [%s], type should be one of %s", settingValue, allGroupingTypes()) + ); + } + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java b/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java new file mode 100644 index 0000000..e7e5349 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java @@ -0,0 +1,220 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import java.io.IOException; +import java.util.Objects; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +/** + * Measurement that is stored in the SearchQueryRecord. Measurement can be of a specific AggregationType + */ +public class Measurement implements ToXContentObject, Writeable { + private static int DEFAULT_COUNT = 1; + private AggregationType aggregationType; + private Number number; + private int count; + + /** + * Constructor + * @param number number + * @param count count + * @param aggregationType aggregationType + */ + public Measurement(Number number, int count, AggregationType aggregationType) { + this.number = number; + this.count = count; + this.aggregationType = aggregationType; + } + + /** + * Constructor + * @param number number + * @param aggregationType aggregationType + */ + public Measurement(Number number, AggregationType aggregationType) { + this(number, DEFAULT_COUNT, aggregationType); + } + + /** + * Constructor + * @param number number + */ + public Measurement(Number number) { + this(number, DEFAULT_COUNT, AggregationType.DEFAULT_AGGREGATION_TYPE); + } + + /** + * Add measurement number to the current number based on the aggregationType. + * If aggregateType is NONE, replace the number since we are not aggregating in this case. + * @param toAdd number to add + */ + public void addMeasurement(Number toAdd) { + switch (aggregationType) { + case NONE: + setMeasurement(toAdd); + break; + case SUM: + setMeasurement(addMeasurementInferType(number, toAdd)); + break; + case AVERAGE: + count += 1; + setMeasurement(addMeasurementInferType(number, toAdd)); + break; + default: + throw new IllegalArgumentException("The following aggregation type is not supported : " + aggregationType); + } + } + + private Number addMeasurementInferType(Number a, Number b) { + if (a instanceof Long && b instanceof Long) { + return a.longValue() + b.longValue(); + } else if (a instanceof Integer && b instanceof Integer) { + return a.intValue() + b.intValue(); + } else if (a instanceof Double && b instanceof Double) { + return a.doubleValue() + b.doubleValue(); + } else if (a instanceof Float && b instanceof Float) { + return a.floatValue() + b.floatValue(); + } else { + throw new IllegalArgumentException("Unsupported number type: " + a.getClass() + " or " + b.getClass()); + } + } + + /** + * Get measurement number based on the aggragation type + * @return measurement number + */ + public Number getMeasurement() { + switch (aggregationType) { + case NONE: + case SUM: + return number; + case AVERAGE: + return getAverageMeasurement(number, count); + default: + throw new IllegalArgumentException("Aggregation Type should be set for measurement."); + } + } + + /** + * Get average measurement number based on the total and count + * @param total total measurement value + * @param count count of measurements + * @return average measurement value + */ + private Number getAverageMeasurement(Number total, int count) { + if (count == 0) { + throw new IllegalArgumentException("Count cannot be zero for average calculation."); + } + + if (total instanceof Long) { + return ((Long) total) / count; + } else if (total instanceof Integer) { + return ((Integer) total) / count; + } else if (total instanceof Double) { + return ((Double) total) / count; + } else if (total instanceof Float) { + return ((Float) total) / count; + } else { + throw new IllegalArgumentException("Unsupported number type: " + total.getClass()); + } + } + + /** + * Set measurement + * @param measurement measurement number + */ + public void setMeasurement(Number measurement) { + number = measurement; + } + + /** + * Set aggregation type + * @param aggregationType aggregation type + */ + public void setAggregationType(AggregationType aggregationType) { + this.aggregationType = aggregationType; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("number", number); + builder.field("count", count); + builder.field("aggregationType", aggregationType.toString()); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + writeNumber(out, number); + out.writeInt(count); + out.writeString(aggregationType.toString()); + } + + private void writeNumber(StreamOutput out, Number number) throws IOException { + if (number instanceof Long) { + out.writeByte((byte) 0); // Type indicator for Long + out.writeLong((Long) number); + } else if (number instanceof Integer) { + out.writeByte((byte) 1); // Type indicator for Integer + out.writeInt((Integer) number); + } else if (number instanceof Double) { + out.writeByte((byte) 2); // Type indicator for Double + out.writeDouble((Double) number); + } else if (number instanceof Float) { + out.writeByte((byte) 3); // Type indicator for Float + out.writeFloat((Float) number); + } else { + throw new IOException("Unsupported number type: " + number.getClass()); + } + } + + private static Number readNumber(StreamInput in) throws IOException { + byte typeIndicator = in.readByte(); + switch (typeIndicator) { + case 0: + return in.readLong(); + case 1: + return in.readInt(); + case 2: + return in.readDouble(); + case 3: + return in.readFloat(); + default: + throw new IOException("Unsupported number type indicator: " + typeIndicator); + } + } + + public static Measurement readFromStream(StreamInput in) throws IOException { + Number number = readNumber(in); + int count = in.readInt(); + AggregationType aggregationType = AggregationType.valueOf(in.readString()); + return new Measurement(number, count, aggregationType); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Measurement that = (Measurement) o; + return count == that.count && Objects.equals(number, that.number) && aggregationType == that.aggregationType; + } + + @Override + public int hashCode() { + return Objects.hash(number, count, aggregationType); + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 7283bd1..42ee029 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -10,8 +10,10 @@ import java.io.IOException; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import org.opensearch.Version; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -26,9 +28,11 @@ * which contains extensive information related to a search query. */ public class SearchQueryRecord implements ToXContentObject, Writeable { + public static final String MEASUREMENTS = "measurements"; private final long timestamp; - private final Map measurements; + private final Map measurements; private final Map attributes; + private String groupingId; /** * Constructor of SearchQueryRecord @@ -39,9 +43,21 @@ public class SearchQueryRecord implements ToXContentObject, Writeable { */ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { this.timestamp = in.readLong(); - measurements = new HashMap<>(); - in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) - .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); + if (in.getVersion().onOrAfter(Version.V_2_17_0)) { + measurements = new LinkedHashMap<>(); + in.readOrderedMap(MetricType::readFromStream, Measurement::readFromStream) + .forEach(((metricType, measurement) -> measurements.put(metricType, measurement))); + this.groupingId = null; + } else { + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue).forEach((metricType, o) -> { + try { + measurements.put(metricType, new Measurement(metricType.parseValue(o))); + } catch (ClassCastException e) { + throw new ClassCastException("Error parsing value for metric type: " + metricType); + } + }); + } this.attributes = Attribute.readAttributeMap(in); } @@ -52,7 +68,7 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce * @param measurements A list of Measurement associated with this query * @param attributes A list of Attributes associated with this query */ - public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { if (measurements == null) { throw new IllegalArgumentException("Measurements cannot be null"); } @@ -77,7 +93,7 @@ public long getTimestamp() { * @return the measurement object, or null if not found */ public Number getMeasurement(final MetricType name) { - return measurements.get(name); + return measurements.get(name).getMeasurement(); } /** @@ -85,7 +101,26 @@ public Number getMeasurement(final MetricType name) { * * @return a map of measurement names to measurement objects */ - public Map getMeasurements() { + + /** + * Add measurement to SearchQueryRecord. Applicable when we are grouping multiple queries based on GroupingType. + * @param metricType the name of the measurement + * @param numberToAdd The measurement number we want to add to the current measurement. + */ + public void addMeasurement(final MetricType metricType, Number numberToAdd) { + measurements.get(metricType).addMeasurement(numberToAdd); + } + + /** + * Set the aggregation type for measurement + * @param name the name of the measurement + * @param aggregationType Aggregation type to set + */ + public void setMeasurementAggregation(final MetricType name, AggregationType aggregationType) { + measurements.get(name).setAggregationType(aggregationType); + } + + public Map getMeasurements() { return measurements; } @@ -115,9 +150,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten for (Map.Entry entry : attributes.entrySet()) { builder.field(entry.getKey().toString(), entry.getValue()); } - for (Map.Entry entry : measurements.entrySet()) { - builder.field(entry.getKey().toString(), entry.getValue()); + builder.startObject(MEASUREMENTS); + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString()); // MetricType as field name + entry.getValue().toXContent(builder, params); // Serialize Measurement object } + builder.endObject(); return builder.endObject(); } @@ -130,7 +168,15 @@ public XContentBuilder toXContent(final XContentBuilder builder, final ToXConten @Override public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); - out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + if (out.getVersion().onOrAfter(Version.V_2_17_0)) { + out.writeMap( + measurements, + (stream, metricType) -> MetricType.writeTo(out, metricType), + (stream, measurement) -> measurement.writeTo(out) + ); + } else { + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + } out.writeMap( attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), @@ -181,4 +227,12 @@ public int hashCode() { public String toString() { return Strings.toString(MediaTypeRegistry.JSON, this); } + + public void setGroupingId(String groupingId) { + this.groupingId = groupingId; + } + + public String getGroupingId() { + return this.groupingId; + } } diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index cb9b39d..89290bc 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.core.exporter.SinkType; +import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; /** @@ -69,6 +70,11 @@ public class QueryInsightsSettings { */ public static final String PLUGINS_BASE_URI = "/_insights"; + public static final GroupingType DEFAULT_GROUPING_TYPE = GroupingType.NONE; + public static final int DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT = 100; + + public static final int MAX_GROUPS_EXCLUDING_TOPN_LIMIT = 10000; + /** * Settings for Top Queries * @@ -112,6 +118,26 @@ public class QueryInsightsSettings { Setting.Property.Dynamic ); + /** + * Define the group_by option for Top N queries to group queries. + */ + public static final Setting TOP_N_QUERIES_GROUP_BY = Setting.simpleString( + TOP_N_QUERIES_SETTING_PREFIX + ".group_by", + DEFAULT_GROUPING_TYPE.getValue(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Define the max_groups_excluding_topn option for Top N queries to group queries. + */ + public static final Setting TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N = Setting.intSetting( + TOP_N_QUERIES_SETTING_PREFIX + ".max_groups_excluding_topn", + DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** * Boolean setting for enabling top queries by cpu. */ diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java index 7a921e4..fca21e5 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -75,6 +75,8 @@ public void testGetSettings() { QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE, QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE, QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS, + QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY, + QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N, QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING ), queryInsightsPlugin.getSettings() diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 6b2d8c6..6fb9dff 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -25,9 +25,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import org.opensearch.action.search.SearchType; @@ -39,7 +41,9 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; +import org.opensearch.plugin.insights.rules.model.AggregationType; import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryCategorizationSettings; @@ -57,41 +61,69 @@ public QueryInsightsTestUtils() {} * @return List of records */ public static List generateQueryInsightRecords(int count) { - return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, AggregationType.DEFAULT_AGGREGATION_TYPE); } /** - * Returns list of randomly generated search query records. + * Returns list of randomly generated search query records with specific searchSourceBuilder * @param count number of records * @param searchSourceBuilder source * @return List of records */ public static List generateQueryInsightRecords(int count, SearchSourceBuilder searchSourceBuilder) { - List records = generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + List records = generateQueryInsightRecords( + count, + count, + System.currentTimeMillis(), + 0, + AggregationType.DEFAULT_AGGREGATION_TYPE + ); for (SearchQueryRecord record : records) { record.getAttributes().put(Attribute.SOURCE, searchSourceBuilder); } return records; } + /** + * Returns list of randomly generated search query records with specific aggregation type for measurements + * @param count number of records + * @param aggregationType source + * @return List of records + */ + public static List generateQueryInsightRecords(int count, AggregationType aggregationType) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0, aggregationType); + } + /** * Creates a List of random Query Insight Records for testing purpose */ public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + return generateQueryInsightRecords(lower, upper, startTimeStamp, interval, AggregationType.NONE); + } + + /** + * Creates a List of random Query Insight Records for testing purpose with dimenstion type specified + */ + public static List generateQueryInsightRecords( + int lower, + int upper, + long startTimeStamp, + long interval, + AggregationType aggregationType + ) { List records = new ArrayList<>(); int countOfRecords = randomIntBetween(lower, upper); long timestamp = startTimeStamp; for (int i = 0; i < countOfRecords; ++i) { - Map measurements = Map.of( - MetricType.LATENCY, - randomLongBetween(1000, 10000), - MetricType.CPU, - randomLongBetween(1000, 10000), - MetricType.MEMORY, - randomLongBetween(1000, 10000) - ); + long latencyValue = randomLongBetween(1000, 10000); // Replace with actual method to generate a random long + long cpuValue = randomLongBetween(1000, 10000); + long memoryValue = randomLongBetween(1000, 10000); + Map measurements = new LinkedHashMap<>(); + measurements.put(MetricType.LATENCY, new Measurement(latencyValue, aggregationType)); + measurements.put(MetricType.CPU, new Measurement(cpuValue, aggregationType)); + measurements.put(MetricType.MEMORY, new Measurement(memoryValue, aggregationType)); - Map phaseLatencyMap = new HashMap<>(); + Map phaseLatencyMap = new LinkedHashMap<>(); int countOfPhases = randomIntBetween(2, 5); for (int j = 0; j < countOfPhases; ++j) { phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); @@ -106,6 +138,7 @@ public static List generateQueryInsightRecords(int lower, int attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put(Attribute.QUERY_HASHCODE, Objects.hashCode(i)); attributes.put( Attribute.TASK_RESOURCE_USAGES, List.of( @@ -132,6 +165,49 @@ public static List generateQueryInsightRecords(int lower, int return records; } + public static List generateQueryInsightsRecordsWithMeasurement( + int count, + MetricType metricType, + Number measurement + ) { + List records = generateQueryInsightRecords(count); + + for (SearchQueryRecord record : records) { + record.getMeasurements().get(metricType).setMeasurement(measurement); + } + return records; + } + + public static List> generateMultipleQueryInsightsRecordsWithMeasurement( + int count, + MetricType metricType, + List measurements + ) { + List> multipleRecordLists = new ArrayList<>(); + + for (int i = 0; i < measurements.size(); i++) { + List records = generateQueryInsightRecords(count); + multipleRecordLists.add(records); + for (SearchQueryRecord record : records) { + record.getMeasurements().get(metricType).setMeasurement(measurements.get(i)); + } + QueryInsightsTestUtils.populateHashcode(records, i); + } + return multipleRecordLists; + } + + public static void populateSameQueryHashcodes(List searchQueryRecords) { + for (SearchQueryRecord record : searchQueryRecords) { + record.getAttributes().put(Attribute.QUERY_HASHCODE, 1); + } + } + + public static void populateHashcode(List searchQueryRecords, int hash) { + for (SearchQueryRecord record : searchQueryRecords) { + record.getAttributes().put(Attribute.QUERY_HASHCODE, hash); + } + } + public static TopQueries createRandomTopQueries() { DiscoveryNode node = new DiscoveryNode( "node_for_top_queries_test", @@ -161,7 +237,7 @@ public static TopQueries createFixedTopQueries() { public static SearchQueryRecord createFixedSearchQueryRecord() { long timestamp = 1706574180000L; - Map measurements = Map.of(MetricType.LATENCY, 1L); + Map measurements = Map.of(MetricType.LATENCY, new Measurement(1L)); Map phaseLatencyMap = new HashMap<>(); Map attributes = new HashMap<>(); @@ -246,6 +322,8 @@ public static void registerAllQueryInsightsSettings(ClusterSettings clusterSetti clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N); clusterSettings.registerSetting(QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING); } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 56f3203..e0fb47d 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -11,11 +11,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import java.util.List; import org.junit.Before; import org.opensearch.client.Client; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; @@ -87,4 +89,71 @@ public void testSearchQueryMetricsEnabled() { assertNotNull(queryInsightsService.getSearchQueryCategorizer()); } + + public void testAddRecordGroupBySimilarityWithDifferentGroups() { + + int numberOfRecordsRequired = 10; + List records = QueryInsightsTestUtils.generateQueryInsightsRecordsWithMeasurement( + numberOfRecordsRequired, + MetricType.LATENCY, + 5 + ); + + queryInsightsService.setGrouping(GroupingType.SIMILARITY.getValue()); + assertEquals(queryInsightsService.getGrouping(), GroupingType.SIMILARITY); + + for (int i = 0; i < numberOfRecordsRequired; i++) { + assertTrue(queryInsightsService.addRecord(records.get(i))); + } + // exceed capacity but handoff to grouping + assertTrue(queryInsightsService.addRecord(records.get(numberOfRecordsRequired - 1))); + + queryInsightsService.drainRecords(); + + assertEquals( + QueryInsightsSettings.DEFAULT_TOP_N_SIZE, + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + ); + } + + public void testAddRecordGroupBySimilarityWithOneGroup() { + int numberOfRecordsRequired = 10; + List records = QueryInsightsTestUtils.generateQueryInsightsRecordsWithMeasurement( + numberOfRecordsRequired, + MetricType.LATENCY, + 5 + ); + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + + queryInsightsService.setGrouping(GroupingType.SIMILARITY.getValue()); + assertEquals(queryInsightsService.getGrouping(), GroupingType.SIMILARITY); + + for (int i = 0; i < numberOfRecordsRequired; i++) { + assertTrue(queryInsightsService.addRecord(records.get(i))); + } + // exceed capacity but handoff to grouping service + assertTrue(queryInsightsService.addRecord(records.get(numberOfRecordsRequired - 1))); + + queryInsightsService.drainRecords(); + assertEquals(1, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size()); + } + + public void testAddRecordGroupBySimilarityWithTwoGroups() { + List records1 = QueryInsightsTestUtils.generateQueryInsightRecords(2, 2, System.currentTimeMillis(), 0); + QueryInsightsTestUtils.populateHashcode(records1, 1); + + List records2 = QueryInsightsTestUtils.generateQueryInsightRecords(2, 2, System.currentTimeMillis(), 0); + QueryInsightsTestUtils.populateHashcode(records2, 2); + + queryInsightsService.setGrouping(GroupingType.SIMILARITY.getValue()); + assertEquals(queryInsightsService.getGrouping(), GroupingType.SIMILARITY); + + for (int i = 0; i < 2; i++) { + assertTrue(queryInsightsService.addRecord(records1.get(i))); + assertTrue(queryInsightsService.addRecord(records2.get(i))); + } + + queryInsightsService.drainRecords(); + assertEquals(2, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index ca75560..d69fe53 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory; +import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; @@ -108,4 +109,39 @@ private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministic } } } + + public void testRollingWindowsWithSameGroup() { + topQueriesService.setGrouping(GroupingType.SIMILARITY); + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + } + + public void testRollingWindowsWithDifferentGroup() { + topQueriesService.setGrouping(GroupingType.SIMILARITY); + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(1, topQueriesService.getTopQueriesRecords(true).size()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java new file mode 100644 index 0000000..f92f49a --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java @@ -0,0 +1,685 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service.grouper; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.PriorityBlockingQueue; +import org.junit.Before; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.AggregationType; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.GroupingType; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Unit Tests for {@link MinMaxHeapQueryGrouper}. + */ +public class MinMaxHeapQueryGrouperTests extends OpenSearchTestCase { + private MinMaxHeapQueryGrouper minMaxHeapQueryGrouper; + private PriorityBlockingQueue topQueriesStore = new PriorityBlockingQueue<>( + 100, + (a, b) -> SearchQueryRecord.compare(a, b, MetricType.LATENCY) + ); + + @Before + public void setup() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.DEFAULT_AGGREGATION_TYPE, 10); + } + + public void testWithAllDifferentHashcodes() { + int numOfRecords = 10; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + SearchQueryRecord groupedRecord; + Set hashcodeSet = new HashSet<>(); + for (SearchQueryRecord record : records) { + groupedRecord = minMaxHeapQueryGrouper.add(record); + int hashcode = (int) groupedRecord.getAttributes().get(Attribute.QUERY_HASHCODE); + hashcodeSet.add(hashcode); + } + assertEquals(numOfRecords, hashcodeSet.size()); + } + + public void testWithAllSameHashcodes() { + int numOfRecords = 10; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord groupedRecord; + Set hashcodeSet = new HashSet<>(); + for (SearchQueryRecord record : records) { + groupedRecord = minMaxHeapQueryGrouper.add(record); + int hashcode = (int) groupedRecord.getAttributes().get(Attribute.QUERY_HASHCODE); + hashcodeSet.add(hashcode); + } + assertEquals(1, hashcodeSet.size()); + } + + public void testDrain() { + int numOfRecords = 10; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + int groupsBeforeDrain = minMaxHeapQueryGrouper.numberOfGroups(); + minMaxHeapQueryGrouper.drain(); + int groupsAfterDrain = minMaxHeapQueryGrouper.numberOfGroups(); + + assertEquals(numOfRecords, groupsBeforeDrain); + assertEquals(0, groupsAfterDrain); + } + + public void testChangeTopNSize() { + int numOfRecords = 15; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertEquals(10, minMaxHeapQueryGrouper.numberOfTopGroups()); // Initially expects top 10 groups + + minMaxHeapQueryGrouper.updateTopNSize(5); + minMaxHeapQueryGrouper.drain(); // Clear previous state + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertEquals(5, minMaxHeapQueryGrouper.numberOfTopGroups()); // After update, expects top 5 groups + } + + public void testEmptyPriorityQueues() { + int groupsBeforeDrain = minMaxHeapQueryGrouper.numberOfGroups(); + assertEquals(0, groupsBeforeDrain); + + minMaxHeapQueryGrouper.drain(); + int groupsAfterDrain = minMaxHeapQueryGrouper.numberOfGroups(); + assertEquals(0, groupsAfterDrain); // No groups should be present after draining + } + + public void testAddRemoveFromMaxHeap() { + int numOfRecords = 15; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertTrue(minMaxHeapQueryGrouper.numberOfTopGroups() <= 10); // Should be at most 10 in the min heap + + minMaxHeapQueryGrouper.updateTopNSize(5); // Change size to 5 + minMaxHeapQueryGrouper.drain(); // Clear previous state + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertEquals(5, minMaxHeapQueryGrouper.numberOfTopGroups()); // Should be exactly 5 in the min heap + } + + public void testInvalidGroupingType() { + MinMaxHeapQueryGrouper invalidGroupingService = new MinMaxHeapQueryGrouper( + MetricType.LATENCY, + GroupingType.NONE, + AggregationType.DEFAULT_AGGREGATION_TYPE, + topQueriesStore, + 10 + ); + SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1).get(0); + expectThrows(IllegalArgumentException.class, () -> invalidGroupingService.add(record)); + } + + public void testLargeNumberOfRecords() { + int numOfRecords = 1000; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertTrue(minMaxHeapQueryGrouper.numberOfTopGroups() <= 10); // Should be at most 10 in the min heap + } + + public void testChangeGroupingType() { + int numOfRecords = 10; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + int groupsBeforeChange = minMaxHeapQueryGrouper.numberOfGroups(); + assertTrue(groupsBeforeChange > 0); + + minMaxHeapQueryGrouper.setGroupingType(GroupingType.NONE); // Changing to NONE should clear groups + + int groupsAfterChange = minMaxHeapQueryGrouper.numberOfGroups(); + assertEquals(0, groupsAfterChange); // Expect no groups after changing to NONE + } + + public void testDrainWithMultipleGroupingTypes() { + int numOfRecords = 20; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + int groupsBeforeDrain = minMaxHeapQueryGrouper.numberOfGroups(); + assertTrue(groupsBeforeDrain > 0); + + minMaxHeapQueryGrouper.setGroupingType(GroupingType.SIMILARITY); + minMaxHeapQueryGrouper.drain(); + + int groupsAfterDrain = minMaxHeapQueryGrouper.numberOfGroups(); + assertEquals(0, groupsAfterDrain); // After drain, groups should be cleared + } + + public void testVaryingTopNSize() { + int numOfRecords = 30; + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords); + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + minMaxHeapQueryGrouper.updateTopNSize(15); + minMaxHeapQueryGrouper.drain(); // Clear previous state + + for (SearchQueryRecord record : records) { + minMaxHeapQueryGrouper.add(record); + } + + assertEquals(15, minMaxHeapQueryGrouper.numberOfTopGroups()); // Should reflect the updated top N size + } + + public void testAddMeasurementSumAggregationLatency() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.SUM, 10); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord aggregatedRecord = null; + + Number expectedSum = 0; + for (SearchQueryRecord record : records) { + aggregatedRecord = minMaxHeapQueryGrouper.add(record); + expectedSum = expectedSum.longValue() + record.getMeasurement(MetricType.LATENCY).longValue(); + } + + assertEquals(expectedSum, aggregatedRecord.getMeasurement(MetricType.LATENCY)); + } + + public void testAddMeasurementAverageAggregationLatency() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 10); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord aggregatedRecord = null; + + Number expectedSum = 0; + int expectedCount = 0; + for (SearchQueryRecord record : records) { + expectedSum = expectedSum.longValue() + record.getMeasurement(MetricType.LATENCY).longValue(); + aggregatedRecord = minMaxHeapQueryGrouper.add(record); + expectedCount += 1; + } + + long expectedAverage = (long) expectedSum / expectedCount; + assertEquals(expectedAverage, aggregatedRecord.getMeasurement(MetricType.LATENCY)); + } + + public void testAddMeasurementNoneAggregationLatency() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.NONE, 10); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord lastRecord = null; + + Number expectedValue = 0; + for (SearchQueryRecord record : records) { + expectedValue = record.getMeasurement(MetricType.LATENCY).longValue(); + lastRecord = minMaxHeapQueryGrouper.add(record); + } + + assertEquals(expectedValue, lastRecord.getMeasurement(MetricType.LATENCY)); + } + + public void testAddMeasurementSumAggregationCpu() { + minMaxHeapQueryGrouper = new MinMaxHeapQueryGrouper( + MetricType.CPU, + GroupingType.SIMILARITY, + AggregationType.SUM, + topQueriesStore, + 10 + ); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord aggregatedRecord = null; + + Number expectedSum = 0; + for (SearchQueryRecord record : records) { + aggregatedRecord = minMaxHeapQueryGrouper.add(record); + expectedSum = expectedSum.longValue() + record.getMeasurement(MetricType.CPU).longValue(); + } + + assertEquals(expectedSum, aggregatedRecord.getMeasurement(MetricType.CPU)); + } + + public void testAddMeasurementAverageAggregationCpu() { + minMaxHeapQueryGrouper = new MinMaxHeapQueryGrouper( + MetricType.CPU, + GroupingType.SIMILARITY, + AggregationType.AVERAGE, + topQueriesStore, + 10 + ); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord aggregatedRecord = null; + + Number expectedSum = 0; + int expectedCount = 0; + for (SearchQueryRecord record : records) { + expectedSum = expectedSum.longValue() + record.getMeasurement(MetricType.CPU).longValue(); + aggregatedRecord = minMaxHeapQueryGrouper.add(record); + expectedCount += 1; + } + + long expectedAverage = (long) expectedSum / expectedCount; + assertEquals(expectedAverage, aggregatedRecord.getMeasurement(MetricType.CPU)); + } + + public void testAddMeasurementNoneAggregationCpu() { + minMaxHeapQueryGrouper = new MinMaxHeapQueryGrouper( + MetricType.CPU, + GroupingType.SIMILARITY, + AggregationType.NONE, + topQueriesStore, + 10 + ); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord lastRecord = null; + + Number expectedValue = 0; + for (SearchQueryRecord record : records) { + expectedValue = record.getMeasurement(MetricType.CPU).longValue(); + lastRecord = minMaxHeapQueryGrouper.add(record); + } + + assertEquals(expectedValue, lastRecord.getMeasurement(MetricType.CPU)); + } + + public void testNoneGroupingTypeIllegalArgumentException() { + minMaxHeapQueryGrouper = new MinMaxHeapQueryGrouper(MetricType.CPU, GroupingType.NONE, AggregationType.NONE, topQueriesStore, 10); + int numOfRecords = 10; + List records = QueryInsightsTestUtils.generateQueryInsightRecords(numOfRecords, AggregationType.NONE); + + // Set all records to have the same hashcode for aggregation + QueryInsightsTestUtils.populateSameQueryHashcodes(records); + SearchQueryRecord aggregatedRecord = null; + + Number expectedSum = 0; + for (SearchQueryRecord record : records) { + expectedSum = expectedSum.longValue() + record.getMeasurement(MetricType.CPU).longValue(); + assertThrows(IllegalArgumentException.class, () -> { minMaxHeapQueryGrouper.add(record); }); + } + } + + // New query group not existing added to MIN + public void testNewGroupAddedToMin() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 2, + MetricType.LATENCY, + List.of(1000L, 1100L) + ); + + for (List recordList : allRecords) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(1000L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1100L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // New query group not existing added to MIN and overflows to MAX + public void testNewGroupOverflowsMinToMax() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 2, + MetricType.LATENCY, + List.of(1000L, 1100L, 900L) + ); + + for (List recordList : allRecords) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(1000L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1100L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // New query group not existing added to MIN and causes other group to overflow to MAX + public void testNewGroupCausesOtherGroupOverflowMinToMax() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 2, + MetricType.LATENCY, + List.of(1000L, 1100L, 1200L) + ); + + for (List recordList : allRecords) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(1100L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1200L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MIN increases average + public void testExistingGroupUpdateToMinIncreaseAverage() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1100L, 1200L, 1000L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1300L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(1200L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1200L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MIN decrease average - stay in MIN + public void testExistingGroupUpdateToMinDecreaseAverageStayInMin() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1100L, 600L, 1000L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(700L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(900L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1000L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MIN decrease average - overflows to MAX + public void testExistingGroupUpdateToMinDecreaseAverageOverflowsToMax() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1199L, 1100L, 1000L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(1000L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1100L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MAX increases average - stay in MAX + public void testExistingGroupUpdateToMaxIncreaseAverageStayInMax() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 975L, 950L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(920L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(950L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(975L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MAX increases average - promote to MIN + public void testExistingGroupUpdateToMaxIncreaseAveragePromoteToMin() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 975L, 950L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(1100L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(975L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1000L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + // Existing query group update to MAX decrease average + public void testExistingGroupUpdateToMaxDecreaseAverage() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 975L, 950L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(800L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(950L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(975L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + } + + public void testSwitchGroupingTypeToNone() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 975L, 950L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(800L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(950L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(975L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + + minMaxHeapQueryGrouper.setGroupingType(GroupingType.NONE); + assertEquals(0, minMaxHeapQueryGrouper.numberOfTopGroups()); + + assertThrows(IllegalArgumentException.class, () -> { minMaxHeapQueryGrouper.add(allRecords1.get(0).get(0)); }); + } + + public void testMultipleQueryGroupsUpdates() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 2); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 1000L, 1000L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(800L, 400L, 1200L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(2, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(850L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(1100L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(3, minMaxHeapQueryGrouper.numberOfGroups()); + } + + public void testMaxGroupLimitReached() { + minMaxHeapQueryGrouper = getQueryGroupingService(AggregationType.AVERAGE, 1); + + minMaxHeapQueryGrouper.setMaxGroups(1); + + List> allRecords1 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(900L, 1000L, 1000L) + ); + + List> allRecords2 = QueryInsightsTestUtils.generateMultipleQueryInsightsRecordsWithMeasurement( + 1, + MetricType.LATENCY, + List.of(800L, 400L, 1200L) + ); + + allRecords1.addAll(allRecords2); + + for (List recordList : allRecords1) { + for (SearchQueryRecord record : recordList) { + minMaxHeapQueryGrouper.add(record); + } + } + + assertEquals(1, minMaxHeapQueryGrouper.numberOfTopGroups()); + assertEquals(850L, topQueriesStore.poll().getMeasurement(MetricType.LATENCY)); + assertEquals(2, minMaxHeapQueryGrouper.numberOfGroups()); + } + + private MinMaxHeapQueryGrouper getQueryGroupingService(AggregationType aggregationType, int topNSize) { + return new MinMaxHeapQueryGrouper(MetricType.LATENCY, GroupingType.SIMILARITY, aggregationType, topQueriesStore, topNSize); + } +} diff --git a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java index 08c9c8a..dbce9f6 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -41,7 +41,7 @@ public void testSerialize() throws Exception { public void testToXContent() throws IOException { char[] expectedXcontent = - "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"latency\":1}]}" + "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}" .toCharArray(); TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries(); ClusterName clusterName = new ClusterName("test-cluster");