Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding cache eviction and listener for invalidating index field type … #142

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.categorizer.IndicesFieldTypeCache;
import org.opensearch.plugin.insights.rules.action.health_stats.HealthStatsAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.health_stats.RestHealthStatsAction;
Expand Down Expand Up @@ -144,7 +145,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
IndicesFieldTypeCache.INDICES_FIELD_TYPE_CACHE_SIZE_KEY
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.opensearch.plugin.insights.core.service.categorizer;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;

jainankitk marked this conversation as resolved.
Show resolved Hide resolved
public class IndicesFieldTypeCache {

private static final Logger logger = LogManager.getLogger(IndicesFieldTypeCache.class);
public static final Setting<ByteSizeValue> INDICES_FIELD_TYPE_CACHE_SIZE_KEY = Setting.memorySizeSetting(
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
"search.insights.indices.fieldtype.cache.size",
new ByteSizeValue(-1),
Setting.Property.NodeScope
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
);
private final Cache<Index, IndexFieldMap> cache;

public IndicesFieldTypeCache(Settings settings) {
final long sizeInBytes = -1; // TODO: INDICES_FIELD_TYPE_CACHE_SIZE_KEY.get(settings).getBytes();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
CacheBuilder<Index, IndexFieldMap> cacheBuilder = CacheBuilder.<Index, IndexFieldMap>builder();
if (sizeInBytes > 0) {
cacheBuilder.setMaximumWeight(sizeInBytes).weigher((k, v) -> RamUsageEstimator.sizeOfObject(k) + v.weight());
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
}
cache = cacheBuilder.build();
}

public IndexFieldMap getOrInitialize(Index index) {
try {
return cache.computeIfAbsent(index, k -> new IndexFieldMap());
} catch (ExecutionException ex) {
logger.error("Unexpected execution exception while initializing for index " + index);
}

return null;
}

public void invalidate(Index index) {
cache.invalidate(index);
}

public Iterable<Index> keySet() {
return cache.keys();
}

static class IndexFieldMap {
private ConcurrentHashMap<String, String> fieldTypeMap;
private CounterMetric weight;
jainankitk marked this conversation as resolved.
Show resolved Hide resolved

IndexFieldMap() {
fieldTypeMap = new ConcurrentHashMap<>();
weight = new CounterMetric();
}

public String get(String fieldName) {
return fieldTypeMap.get(fieldName);
}

public void putIfAbsent(String key, String value) {
// Increment the weight only if the key value got added to the Map
if (fieldTypeMap.putIfAbsent(key, value) == null) {
weight.inc(RamUsageEstimator.sizeOf(key) + RamUsageEstimator.sizeOf(value));
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
}
}

public long weight() {
return weight.count();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@

package org.opensearch.plugin.insights.core.service.categorizer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.BytesRef;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.hash.MurmurHash3;
import org.opensearch.core.common.io.stream.NamedWriteable;
Expand All @@ -33,16 +34,36 @@
/**
* Class to generate query shape
*/
public class QueryShapeGenerator {
public class QueryShapeGenerator implements ClusterStateListener {
static final String EMPTY_STRING = "";
static final String ONE_SPACE_INDENT = " ";
private final ClusterService clusterService;
private final String NO_FIELD_TYPE_VALUE = "";
private final ConcurrentHashMap<Index, ConcurrentHashMap<String, String>> fieldTypeMap;
private final IndicesFieldTypeCache indicesFieldTypeCache;

public QueryShapeGenerator(ClusterService clusterService) {
this.clusterService = clusterService;
this.fieldTypeMap = new ConcurrentHashMap<>();
clusterService.addListener(this);
this.indicesFieldTypeCache = new IndicesFieldTypeCache(clusterService.getSettings());
}

public void clusterChanged(ClusterChangedEvent event) {
final List<Index> indicesDeleted = event.indicesDeleted();
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
for (Index index : indicesDeleted) {
// remove the deleted index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}

if (event.metadataChanged()) {
final Metadata previousMetadata = event.previousState().metadata();
final Metadata currentMetadata = event.state().metadata();
for (Index index : indicesFieldTypeCache.keySet()) {
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
if (previousMetadata.index(index) != currentMetadata.index(index)) {
// remove the updated index mapping from field type cache
indicesFieldTypeCache.invalidate(index);
}
}
}
}

/**
Expand Down Expand Up @@ -127,20 +148,12 @@ public String buildShape(
}

private Map<String, Object> getPropertiesMapForIndex(Index index) {
Map<String, MappingMetadata> indexMapping;
try {
indexMapping = clusterService.state().metadata().findMappings(new String[] { index.getName() }, input -> str -> true);
} catch (IOException e) {
// If an error occurs while retrieving mappings, return an empty map
return Collections.emptyMap();
}

MappingMetadata mappingMetadata = indexMapping.get(index.getName());
if (mappingMetadata == null) {
IndexMetadata indexMetadata = clusterService.state().metadata().index(index);
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
if (indexMetadata == null) {
return Collections.emptyMap();
}

Map<String, Object> propertiesMap = (Map<String, Object>) mappingMetadata.getSourceAsMap().get("properties");
Map<String, Object> propertiesMap = (Map<String, Object>) indexMetadata.mapping().getSourceAsMap().get("properties");
if (propertiesMap == null) {
return Collections.emptyMap();
}
Expand Down Expand Up @@ -363,8 +376,7 @@ String getFieldType(String fieldName, Map<String, Object> propertiesAsMap, Index
fieldType = getFieldTypeFromProperties(fieldName, propertiesAsMap);

// Cache field type or NO_FIELD_TYPE_VALUE if not found
fieldTypeMap.computeIfAbsent(index, k -> new ConcurrentHashMap<>())
.putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);
indicesFieldTypeCache.getOrInitialize(index).putIfAbsent(fieldName, fieldType != null ? fieldType : NO_FIELD_TYPE_VALUE);

return fieldType;
}
Expand Down Expand Up @@ -406,6 +418,6 @@ else if (currentMap.containsKey("type")) {
}

String getFieldTypeFromCache(String fieldName, Index index) {
return fieldTypeMap.getOrDefault(index, new ConcurrentHashMap<>()).get(fieldName);
return indicesFieldTypeCache.getOrInitialize(index).get(fieldName);
}
}
Loading
Loading