Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Top n queries historical queries from local index and time range #84

Merged
merged 10 commits into from
Sep 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public Collection<Object> createComponents(
clusterService.getClusterSettings(),
threadPool,
client,
metricsRegistry
metricsRegistry,
xContentRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

/**
* Local index reader for reading query insights data from local OpenSearch indices.
*/
public final class LocalIndexReader implements QueryInsightsReader {
/**
* Logger of the local index reader
*/
private final Logger logger = LogManager.getLogger();
private final Client client;
private DateTimeFormatter indexPattern;
private final NamedXContentRegistry namedXContentRegistry;

/**
* Constructor of LocalIndexReader
*
* @param client OS client
* @param indexPattern the pattern of index to read from
* @param namedXContentRegistry for parsing purposes
*/
public LocalIndexReader(final Client client, final DateTimeFormatter indexPattern, final NamedXContentRegistry namedXContentRegistry) {
this.indexPattern = indexPattern;
this.client = client;
this.namedXContentRegistry = namedXContentRegistry;
}

/**
* Getter of indexPattern
*
* @return indexPattern
*/
public DateTimeFormatter getIndexPattern() {
return indexPattern;
}

/**
* Setter of indexPattern
*
* @param indexPattern index pattern
* @return the current LocalIndexReader
*/
public LocalIndexReader setIndexPattern(DateTimeFormatter indexPattern) {
this.indexPattern = indexPattern;
return this;
}

/**
* Export a list of SearchQueryRecord from local index
*
* @param from start timestamp
* @param to end timestamp
* @return list of SearchQueryRecords whose timestamps fall between from and to
*/
@Override
public List<SearchQueryRecord> read(final String from, final String to) {
List<SearchQueryRecord> records = new ArrayList<>();
if (from == null || to == null) {
return records;
}
final DateTime start = DateTime.parse(from);
DateTime end = DateTime.parse(to);
if (end.compareTo(DateTime.now(DateTimeZone.UTC)) > 0) {
end = DateTime.now(DateTimeZone.UTC);
}
DateTime curr = start;
while (curr.compareTo(end.plusDays(1).withTimeAtStartOfDay()) < 0) {
Copy link
Member

@ansjcy ansjcy Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We are making assumptions on the index pattern that it will rollover every day. We should add this assumption with a check in the configuration API as well. And we should not allow user to change this rollover period and only allow them to change the prefix pattern. In that case

MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");

would become something like

MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", TOP_N_QUERIES_INDEX_PATTERN + "*");

String index = getDateTimeFromFormat(curr);
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder excludeQuery = QueryBuilders.matchQuery("indices", "top_queries*");
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp").from(start.getMillis()).to(end.getMillis());
QueryBuilder query = QueryBuilders.boolQuery().must(rangeQuery).mustNot(excludeQuery);
searchSourceBuilder.query(query);
searchRequest.source(searchSourceBuilder);
try {
SearchResponse searchResponse = client.search(searchRequest).actionGet();
for (SearchHit hit : searchResponse.getHits()) {
SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry);
records.add(record);
}
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
logger.error("Unable to parse search hit: ", e);
}
curr = curr.plusDays(1);

}
return records;
}

/**
* Close the reader sink
*/
@Override
public void close() {
logger.debug("Closing the LocalIndexReader..");
}

private String getDateTimeFromFormat(DateTime current) {
return indexPattern.print(current);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import java.io.Closeable;
import java.util.List;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

/**
* Base interface for Query Insights readers
*/
public interface QueryInsightsReader extends Closeable {
/**
* Reader a list of SearchQueryRecord
*
* @param from string
* @param to string
* @return List of SearchQueryRecord
*/
List<SearchQueryRecord> read(final String from, final String to);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.reader;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;

import java.io.IOException;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.format.DateTimeFormat;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.NamedXContentRegistry;

/**
* Factory class for validating and creating Readers based on provided settings
*/
public class QueryInsightsReaderFactory {
/**
* Logger of the query insights Reader factory
*/
private final Logger logger = LogManager.getLogger();
final private Client client;
final private Set<QueryInsightsReader> Readers;

/**
* Constructor of QueryInsightsReaderFactory
*
* @param client OS client
*/
public QueryInsightsReaderFactory(final Client client) {
this.client = client;
this.Readers = new HashSet<>();
}

/**
* Validate Reader sink config
*
* @param settings Reader sink config {@link Settings}
* @throws IllegalArgumentException if provided Reader sink config settings are invalid
*/
public void validateReaderConfig(final Settings settings) throws IllegalArgumentException {
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
if (indexPattern.isEmpty()) {
throw new IllegalArgumentException("Empty index pattern configured for the Reader");
}
try {
DateTimeFormat.forPattern(indexPattern);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the Reader", indexPattern)
);
}
}

/**
* Create a Reader based on provided parameters
*
* @param indexPattern the index pattern if creating an index Reader
* @param namedXContentRegistry for parsing purposes
* @return QueryInsightsReader the created Reader
*/
public QueryInsightsReader createReader(String indexPattern, NamedXContentRegistry namedXContentRegistry) {
QueryInsightsReader Reader = new LocalIndexReader(client, DateTimeFormat.forPattern(indexPattern), namedXContentRegistry);
this.Readers.add(Reader);
return Reader;
}

/**
* Update a Reader based on provided parameters
*
* @param Reader The Reader to update
* @param indexPattern the index pattern if creating an index Reader
* @return QueryInsightsReader the updated Reader sink
*/
public QueryInsightsReader updateReader(QueryInsightsReader Reader, String indexPattern) {
if (Reader.getClass() == LocalIndexReader.class) {
((LocalIndexReader) Reader).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
}
return Reader;
}

/**
* Close a Reader
*
* @param Reader the Reader to close
*/
public void closeReader(QueryInsightsReader Reader) throws IOException {
if (Reader != null) {
Reader.close();
this.Readers.remove(Reader);
}
}

/**
* Close all Readers
*
*/
public void closeAllReaders() {
for (QueryInsightsReader Reader : Readers) {
try {
closeReader(Reader);
} catch (IOException e) {
logger.error("Fail to close query insights Reader, error: ", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Query Insights reader
*/
package org.opensearch.plugin.insights.core.reader;
Loading
Loading