Skip to content

Commit

Permalink
Adding cache eviction and listener for invalidating index field type … (
Browse files Browse the repository at this point in the history
#142) (#143)

* Adding cache eviction and listener for invalidating index field type mappings on index deletion/update



* Fixing spotless violations



* Fixing code to use .index instead of .findMappings



* Fixing spotless violations



* Addressing review comments



* Fixing existing test failures



* Fixing existing test failures



---------


(cherry picked from commit fb24118)

Signed-off-by: Ankit Jain <akjain@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 83ca682 commit acabdc7
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.service.categorizer;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.plugin.insights.settings.QueryCategorizationSettings;

/**
* Cache implementation specifically for maintaining the field name type mappings
* for indices that are part of successful search requests
*/
public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
private final Cache<Index, IndexFieldMap> cache;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
CacheBuilder<Index, IndexFieldMap> cacheBuilder = CacheBuilder.<Index, IndexFieldMap>builder();
if (sizeInBytes > 0) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
}
cache = cacheBuilder.build();
}

public IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
logger.error("Unexpected execution exception while initializing for index " + index);
}

// Should never return null as the ExecutionException is only thrown
// if loader throws an exception or returns a null value, which cannot
// be the case in this scenario
return null;
}

public void invalidate(Index index) {
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
weight = new CounterMetric();
}

public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
// Increment the weight only if the key value pair added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
}
}

public long weight() {
return weight.count();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

package org.opensearch.plugin.insights.core.service.categorizer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.BytesRef;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.core.common.io.stream.NamedWriteable;
Expand All @@ -33,16 +34,36 @@
/**
* Class to generate query shape
*/
public class QueryShapeGenerator {
public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final ConcurrentHashMap<Index, ConcurrentHashMap<String, String>> fieldTypeMap;
private final IndicesFieldTypeCache indicesFieldTypeCache;

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
this.fieldTypeMap = new ConcurrentHashMap<>();
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
}

public void clusterChanged(ClusterChangedEvent event) {
final List<Index> indicesDeleted = event.indicesDeleted();
for (Index index : indicesDeleted) {
// remove the deleted index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}

if (event.metadataChanged()) {
final Metadata previousMetadata = event.previousState().metadata();
final Metadata currentMetadata = event.state().metadata();
for (Index index : indicesFieldTypeCache.keySet()) {
if (previousMetadata.index(index) != currentMetadata.index(index)) {
// remove the updated index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}
}
}
}

/**
Expand Down Expand Up @@ -127,20 +148,12 @@ public String buildShape(
}

private Map<String, Object> getPropertiesMapForIndex(Index index) {
Map<String, MappingMetadata> indexMapping;
try {
indexMapping = clusterService.state().metadata().findMappings(new String[] { index.getName() }, input -> str -> true);
} catch (IOException e) {
// If an error occurs while retrieving mappings, return an empty map
return Collections.emptyMap();
}

MappingMetadata mappingMetadata = indexMapping.get(index.getName());
if (mappingMetadata == null) {
IndexMetadata indexMetadata = clusterService.state().metadata().index(index);
if (indexMetadata == null) {
return Collections.emptyMap();
}

Map<String, Object> propertiesMap = (Map<String, Object>) mappingMetadata.getSourceAsMap().get("properties");
Map<String, Object> propertiesMap = (Map<String, Object>) indexMetadata.mapping().getSourceAsMap().get("properties");
if (propertiesMap == null) {
return Collections.emptyMap();
}
Expand Down Expand Up @@ -363,8 +376,7 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
fieldTypeMap.computeIfAbsent(index, k -> new ConcurrentHashMap<>())
.putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);

return fieldType;
}
Expand Down Expand Up @@ -406,6 +418,6 @@ else if (currentMap.containsKey("type")) {
}

String getFieldTypeFromCache(String fieldName, Index index) {
return fieldTypeMap.getOrDefault(index, new ConcurrentHashMap<>()).get(fieldName);
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.insights.settings;

import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeValue;

/**
* Settings for Query Categorization
Expand All @@ -24,6 +25,12 @@ public class QueryCategorizationSettings {
Setting.Property.Dynamic
);

public static final Setting<ByteSizeValue> SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY = Setting.memorySizeSetting(
"search.query.fieldtype.cache.size",
"0.1%",
Setting.Property.NodeScope
);

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public void testGetSettings() {
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
),
queryInsightsPlugin.getSettings()
);
Expand Down
Loading

0 comments on commit acabdc7

Please sign in to comment.