Skip to content

Commit

Permalink
Top n queries historical queries from local index and time range (#84)
Browse files Browse the repository at this point in the history
* Added support for historical top n queries

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Handle indexing errors and accurate search parsing

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Correctly filter in window queries when from and to exist

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Remove search requests from top n queries

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Removed comments

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Comments for new functions

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Comments for new functions

Signed-off-by: Emily Guo <emilyguo@amazon.com>

Updated comments

Signed-off-by: Emily Guo <emilyguo@amazon.com>

* Added unit tests

Signed-off-by: Emily Guo <emilyguo@amazon.com>

* Update LocalIndexExporter.java

Signed-off-by: Emily Guo <35637792+LilyCaroline17@users.noreply.github.com>

* Update LocalIndexReaderTests.java

Signed-off-by: Emily Guo <35637792+LilyCaroline17@users.noreply.github.com>

* Update QueryInsightsReaderFactoryTests.java

Signed-off-by: Emily Guo <35637792+LilyCaroline17@users.noreply.github.com>

* Address comments, change getTimeRange into getFrom and getTo, apply filters to historical, and rename functions

Signed-off-by: Emily Guo <LilyCaroline1717@gmail.com>

* Fix test cases

Signed-off-by: Emily Guo <LilyCaroline1717@gmail.com>

---------

Signed-off-by: Emily Guo <emilyguo@amazon.com>
Signed-off-by: Emily Guo <35637792+LilyCaroline17@users.noreply.github.com>
Signed-off-by: Emily Guo <LilyCaroline1717@gmail.com>
  • Loading branch information
LilyCaroline17 authored Sep 5, 2024
1 parent ca0d999 commit 4d896cc
Show file tree
Hide file tree
Showing 19 changed files with 953 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public Collection<Object> createComponents(
clusterService.getClusterSettings(),
threadPool,
client,
metricsRegistry
metricsRegistry,
xContentRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

/**
* Local index reader for reading query insights data from local OpenSearch indices.
*/
public final class LocalIndexReader implements QueryInsightsReader {
/**
* Logger of the local index reader
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Constructor of LocalIndexReader
*
* @param client OS client
* @param indexPattern the pattern of index to read from
* @param namedXContentRegistry for parsing purposes
*/
public LocalIndexReader(final Client client, final DateTimeFormatter indexPattern, final NamedXContentRegistry namedXContentRegistry) {
this.indexPattern = indexPattern;
this.client = client;
this.namedXContentRegistry = namedXContentRegistry;
}

/**
* Getter of indexPattern
*
* @return indexPattern
*/
public DateTimeFormatter getIndexPattern() {
return indexPattern;
}

/**
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexReader
*/
public LocalIndexReader setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
* Export a list of SearchQueryRecord from local index
*
* @param from start timestamp
* @param to end timestamp
* @return list of SearchQueryRecords whose timestamps fall between from and to
*/
@Override
public List<SearchQueryRecord> read(final String from, final String to) {
List<SearchQueryRecord> records = new ArrayList<>();
if (from == null || to == null) {
return records;
}
final DateTime start = DateTime.parse(from);
DateTime end = DateTime.parse(to);
if (end.compareTo(DateTime.now(DateTimeZone.UTC)) > 0) {
end = DateTime.now(DateTimeZone.UTC);
}
DateTime curr = start;
while (curr.compareTo(end.plusDays(1).withTimeAtStartOfDay()) < 0) {
String index = getDateTimeFromFormat(curr);
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp").from(start.getMillis()).to(end.getMillis());
QueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery);
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest).actionGet();
for (SearchHit hit : searchResponse.getHits()) {
SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry);
records.add(record);
}
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
logger.error("Unable to parse search hit: ", e);
}
curr = curr.plusDays(1);

}
return records;
}

/**
* Close the reader sink
*/
@Override
public void close() {
logger.debug("Closing the LocalIndexReader..");
}

private String getDateTimeFromFormat(DateTime current) {
return indexPattern.print(current);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import java.io.Closeable;
import java.util.List;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

/**
* Base interface for Query Insights readers
*/
public interface QueryInsightsReader extends Closeable {
/**
* Reader a list of SearchQueryRecord
*
* @param from string
* @param to string
* @return List of SearchQueryRecord
*/
List<SearchQueryRecord> read(final String from, final String to);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;

import java.io.IOException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.NamedXContentRegistry;

/**
* Factory class for validating and creating Readers based on provided settings
*/
public class QueryInsightsReaderFactory {
/**
* Logger of the query insights Reader factory
*/
private final Logger logger = LogManager.getLogger();
final private Client client;
final private Set<QueryInsightsReader> Readers;

/**
* Constructor of QueryInsightsReaderFactory
*
* @param client OS client
*/
public QueryInsightsReaderFactory(final Client client) {
this.client = client;
this.Readers = new HashSet<>();
}

/**
* Validate Reader sink config
*
* @param settings Reader sink config {@link Settings}
* @throws IllegalArgumentException if provided Reader sink config settings are invalid
*/
public void validateReaderConfig(final Settings settings) throws IllegalArgumentException {
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
if (indexPattern.isEmpty()) {
throw new IllegalArgumentException("Empty index pattern configured for the Reader");
}
try {
DateTimeFormat.forPattern(indexPattern);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the Reader", indexPattern)
);
}
}

/**
* Create a Reader based on provided parameters
*
* @param indexPattern the index pattern if creating an index Reader
* @param namedXContentRegistry for parsing purposes
* @return QueryInsightsReader the created Reader
*/
public QueryInsightsReader createReader(String indexPattern, NamedXContentRegistry namedXContentRegistry) {
QueryInsightsReader Reader = new LocalIndexReader(client, DateTimeFormat.forPattern(indexPattern), namedXContentRegistry);
this.Readers.add(Reader);
return Reader;
}

/**
* Update a Reader based on provided parameters
*
* @param Reader The Reader to update
* @param indexPattern the index pattern if creating an index Reader
* @return QueryInsightsReader the updated Reader sink
*/
public QueryInsightsReader updateReader(QueryInsightsReader Reader, String indexPattern) {
if (Reader.getClass() == LocalIndexReader.class) {
((LocalIndexReader) Reader).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
}
return Reader;
}

/**
* Close a Reader
*
* @param Reader the Reader to close
*/
public void closeReader(QueryInsightsReader Reader) throws IOException {
if (Reader != null) {
Reader.close();
this.Readers.remove(Reader);
}
}

/**
* Close all Readers
*
*/
public void closeAllReaders() {
for (QueryInsightsReader Reader : Readers) {
try {
closeReader(Reader);
} catch (IOException e) {
logger.error("Fail to close query insights Reader, error: ", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Query Insights reader
*/
package org.opensearch.plugin.insights.core.reader;
Loading

0 comments on commit 4d896cc

Please sign in to comment.