diff --git a/CHANGELOG.md b/CHANGELOG.md index 198ba432e237d..66f9e8d3697a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474)) - Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379)) - Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086)) - Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155)) diff --git a/server/src/main/java/org/opensearch/search/ResourceType.java b/server/src/main/java/org/opensearch/search/ResourceType.java new file mode 100644 index 0000000000000..5bbcd7de1c2ce --- /dev/null +++ b/server/src/main/java/org/opensearch/search/ResourceType.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +/** + * Enum to hold the resource type + */ +public enum ResourceType { + CPU("cpu"), + JVM("jvm"); + + private final String name; + + ResourceType(String name) { + this.name = name; + } + + /** + * The string match here is case-sensitive + * @param s name matching the resource type name + * @return a {@link ResourceType} + */ + public static ResourceType fromName(String s) { + for (ResourceType resourceType : values()) { + if (resourceType.getName().equals(s)) { + return resourceType; + } + } + throw new IllegalArgumentException("Unknown resource type: [" + s + "]"); + } + + private String getName() { + return name; + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index fac1ac319087e..c51b3ae452be1 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -18,6 +18,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -28,9 +29,11 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.NodeDuressTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.SearchBackpressureTask; import org.opensearch.tasks.Task; @@ -44,11 +47,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.DoubleSupplier; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -62,7 +67,14 @@ */ public class SearchBackpressureService extends AbstractLifecycleComponent implements TaskCompletionListener { private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class); - + private static final Map> trackerApplyConditions = Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (nodeDuressTrackers) -> nodeDuressTrackers.isResourceInDuress(ResourceType.CPU), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (nodeDuressTrackers) -> isHeapTrackingSupported() && nodeDuressTrackers.isResourceInDuress(ResourceType.JVM), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER, + (nodeDuressTrackers) -> true + ); private volatile Scheduler.Cancellable scheduledFuture; private final SearchBackpressureSettings settings; @@ -70,8 +82,8 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem private final ThreadPool threadPool; private final LongSupplier timeNanosSupplier; - private final List nodeDuressTrackers; - private final Map, List> taskTrackers; + private final NodeDuressTrackers nodeDuressTrackers; + private final Map, TaskResourceUsageTrackers> taskTrackers; private final Map, SearchBackpressureState> searchBackpressureStates; private final TaskManager taskManager; @@ -82,19 +94,26 @@ public SearchBackpressureService( ThreadPool threadPool, TaskManager taskManager ) { - this( - settings, - taskResourceTrackingService, - threadPool, - System::nanoTime, - List.of( - new NodeDuressTracker( - () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold() - ), - new NodeDuressTracker( - () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() - ) - ), + this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { + { + put( + ResourceType.CPU, + new NodeDuressTracker( + () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings() + .getCpuThreshold(), + () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches() + ) + ); + put( + ResourceType.JVM, + new NodeDuressTracker( + () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings() + .getHeapThreshold(), + () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches() + ) + ); + } + }), getTrackers( settings.getSearchTaskSettings()::getCpuTimeNanosThreshold, settings.getSearchTaskSettings()::getHeapVarianceThreshold, @@ -117,14 +136,14 @@ public SearchBackpressureService( ); } - public SearchBackpressureService( + SearchBackpressureService( SearchBackpressureSettings settings, TaskResourceTrackingService taskResourceTrackingService, ThreadPool threadPool, LongSupplier timeNanosSupplier, - List nodeDuressTrackers, - List searchTaskTrackers, - List searchShardTaskTrackers, + NodeDuressTrackers nodeDuressTrackers, + TaskResourceUsageTrackers searchTaskTrackers, + TaskResourceUsageTrackers searchShardTaskTrackers, TaskManager taskManager ) { this.settings = settings; @@ -163,40 +182,48 @@ void doRun() { return; } - if (isNodeInDuress() == false) { + if (nodeDuressTrackers.isNodeInDuress() == false) { return; } List searchTasks = getTaskByType(SearchTask.class); List searchShardTasks = getTaskByType(SearchShardTask.class); - List cancellableTasks = new ArrayList<>(); + + boolean isHeapUsageDominatedBySearchTasks = isHeapUsageDominatedBySearch( + searchTasks, + getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold() + ); + boolean isHeapUsageDominatedBySearchShardTasks = isHeapUsageDominatedBySearch( + searchShardTasks, + getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold() + ); + final Map, List> cancellableTasks = Map.of( + SearchTask.class, + isHeapUsageDominatedBySearchTasks ? searchTasks : Collections.emptyList(), + SearchShardTask.class, + isHeapUsageDominatedBySearchShardTasks ? searchShardTasks : Collections.emptyList() + ); // Force-refresh usage stats of these tasks before making a cancellation decision. taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0])); taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0])); - // Check if increase in heap usage is due to SearchTasks - if (HeapUsageTracker.isHeapUsageDominatedBySearch( - searchTasks, - getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold() - )) { - cancellableTasks.addAll(searchTasks); - } + List taskCancellations = new ArrayList<>(); - // Check if increase in heap usage is due to SearchShardTasks - if (HeapUsageTracker.isHeapUsageDominatedBySearch( - searchShardTasks, - getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold() - )) { - cancellableTasks.addAll(searchShardTasks); + for (TaskResourceUsageTrackerType trackerType : TaskResourceUsageTrackerType.values()) { + if (shouldApply(trackerType)) { + addResourceTrackerBasedCancellations(trackerType, taskCancellations, cancellableTasks); + } } - // none of the task type is breaching the heap usage thresholds and hence we do not cancel any tasks - if (cancellableTasks.isEmpty()) { - return; - } + // Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task + // We need to merge them + taskCancellations = mergeTaskCancellations(taskCancellations).stream() + .map(this::addSBPStateUpdateCallback) + .filter(TaskCancellation::isEligibleForCancellation) + .collect(Collectors.toList()); - for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) { + for (TaskCancellation taskCancellation : taskCancellations) { logger.warn( "[{} mode] cancelling task [{}] due to high resource consumption [{}]", mode.getName(), @@ -226,6 +253,66 @@ void doRun() { } } + /** + * Had to define this method to help mock this static method to test the scenario where SearchTraffic should not be + * penalised when not breaching the threshold + * @param searchTasks inFlight co-ordinator requests + * @param threshold miniumum jvm allocated bytes ratio w.r.t. available heap + * @return a boolean value based on whether the threshold is breached + */ + boolean isHeapUsageDominatedBySearch(List searchTasks, double threshold) { + return HeapUsageTracker.isHeapUsageDominatedBySearch(searchTasks, threshold); + } + + private TaskCancellation addSBPStateUpdateCallback(TaskCancellation taskCancellation) { + CancellableTask task = taskCancellation.getTask(); + Runnable toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount; + if (task instanceof SearchTask) { + toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount; + } + List newOnCancelCallbacks = new ArrayList<>(taskCancellation.getOnCancelCallbacks()); + newOnCancelCallbacks.add(toAddCancellationCallbackForSBPState); + return new TaskCancellation(task, taskCancellation.getReasons(), newOnCancelCallbacks); + } + + private boolean shouldApply(TaskResourceUsageTrackerType trackerType) { + return trackerApplyConditions.get(trackerType).apply(nodeDuressTrackers); + } + + private List addResourceTrackerBasedCancellations( + TaskResourceUsageTrackerType type, + List taskCancellations, + Map, List> cancellableTasks + ) { + for (Map.Entry, TaskResourceUsageTrackers> taskResourceUsageTrackers : taskTrackers + .entrySet()) { + final Optional taskResourceUsageTracker = taskResourceUsageTrackers.getValue().getTracker(type); + final Class taskType = taskResourceUsageTrackers.getKey(); + + taskResourceUsageTracker.ifPresent( + tracker -> taskCancellations.addAll(tracker.getTaskCancellations(cancellableTasks.get(taskType))) + ); + } + + return taskCancellations; + } + + /** + * Method to reduce the taskCancellations into unique bunch + * @param taskCancellations all task cancellations + * @return unique task cancellations + */ + private List mergeTaskCancellations(final List taskCancellations) { + final Map uniqueTaskCancellations = new HashMap<>(); + + for (TaskCancellation taskCancellation : taskCancellations) { + final long taskId = taskCancellation.getTask().getId(); + uniqueTaskCancellations.put(taskId, uniqueTaskCancellations.getOrDefault(taskId, taskCancellation).merge(taskCancellation)); + } + + return new ArrayList<>(uniqueTaskCancellations.values()); + } + /** * Given a task, returns the type of the task */ @@ -243,16 +330,7 @@ Class getTaskType(Task task) { * Returns true if the node is in duress consecutively for the past 'n' observations. */ boolean isNodeInDuress() { - boolean isNodeInDuress = false; - int numSuccessiveBreaches = getSettings().getNodeDuressSettings().getNumSuccessiveBreaches(); - - for (NodeDuressTracker tracker : nodeDuressTrackers) { - if (tracker.check() >= numSuccessiveBreaches) { - isNodeInDuress = true; // not breaking the loop so that each tracker's streak gets updated. - } - } - - return isNodeInDuress; + return nodeDuressTrackers.isNodeInDuress(); } /* @@ -271,39 +349,6 @@ List getTa .collect(Collectors.toUnmodifiableList()); } - /** - * Returns a TaskCancellation wrapper containing the list of reasons (possibly zero), along with an overall - * cancellation score for the given task. Cancelling a task with a higher score has better chance of recovering the - * node from duress. - */ - TaskCancellation getTaskCancellation(CancellableTask task) { - List reasons = new ArrayList<>(); - List callbacks = new ArrayList<>(); - Class taskType = getTaskType(task); - List trackers = taskTrackers.get(taskType); - for (TaskResourceUsageTracker tracker : trackers) { - Optional reason = tracker.checkAndMaybeGetCancellationReason(task); - if (reason.isPresent()) { - callbacks.add(tracker::incrementCancellations); - reasons.add(reason.get()); - } - } - callbacks.add(searchBackpressureStates.get(taskType)::incrementCancellationCount); - - return new TaskCancellation(task, reasons, callbacks); - } - - /** - * Returns a list of TaskCancellations sorted by descending order of their cancellation scores. - */ - List getTaskCancellations(List tasks) { - return tasks.stream() - .map(this::getTaskCancellation) - .filter(TaskCancellation::isEligibleForCancellation) - .sorted(Comparator.reverseOrder()) - .collect(Collectors.toUnmodifiableList()); - } - SearchBackpressureSettings getSettings() { return settings; } @@ -315,7 +360,7 @@ SearchBackpressureState getSearchBackpressureState(Class getTrackers( + public static TaskResourceUsageTrackers getTrackers( LongSupplier cpuThresholdSupplier, DoubleSupplier heapVarianceSupplier, DoubleSupplier heapPercentThresholdSupplier, @@ -324,23 +369,27 @@ public static List getTrackers( ClusterSettings clusterSettings, Setting windowSizeSetting ) { - List trackers = new ArrayList<>(); - trackers.add(new CpuUsageTracker(cpuThresholdSupplier)); + TaskResourceUsageTrackers trackers = new TaskResourceUsageTrackers(); + trackers.addTracker(new CpuUsageTracker(cpuThresholdSupplier), TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); if (isHeapTrackingSupported()) { - trackers.add( + trackers.addTracker( new HeapUsageTracker( heapVarianceSupplier, heapPercentThresholdSupplier, heapMovingAverageWindowSize, clusterSettings, windowSizeSetting - ) + ), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER ); } else { logger.warn("heap size couldn't be determined"); } - trackers.add(new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime)); - return Collections.unmodifiableList(trackers); + trackers.addTracker( + new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime), + TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER + ); + return trackers; } @Override @@ -360,8 +409,8 @@ public void onTaskCompleted(Task task) { } List exceptions = new ArrayList<>(); - List trackers = taskTrackers.get(taskType); - for (TaskResourceUsageTracker tracker : trackers) { + TaskResourceUsageTrackers trackers = taskTrackers.get(taskType); + for (TaskResourceUsageTracker tracker : trackers.all()) { try { tracker.update(task); } catch (Exception e) { @@ -400,6 +449,7 @@ public SearchBackpressureStats nodeStats() { searchBackpressureStates.get(SearchTask.class).getCancellationCount(), searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(), taskTrackers.get(SearchTask.class) + .all() .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks))) ); @@ -408,6 +458,7 @@ public SearchBackpressureStats nodeStats() { searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(), searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(), taskTrackers.get(SearchShardTask.class) + .all() .stream() .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks))) ); diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java index 678c19d83fb96..6c76f94f5c940 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java @@ -17,8 +17,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import java.io.IOException; import java.util.Map; diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java index 302350104bd3a..279f239703f3c 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java +++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java @@ -17,8 +17,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import java.io.IOException; import java.util.Map; diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 507953cb4a20e..a303b625f4b59 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -12,6 +12,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -34,7 +35,30 @@ public class CpuUsageTracker extends TaskResourceUsageTracker { private final LongSupplier thresholdSupplier; public CpuUsageTracker(LongSupplier thresholdSupplier) { + this(thresholdSupplier, (task) -> { + long usage = task.getTotalResourceStats().getCpuTimeInNanos(); + long threshold = thresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "cpu usage exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + }); + } + + public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) { this.thresholdSupplier = thresholdSupplier; + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; } @Override @@ -42,27 +66,6 @@ public String name() { return CPU_USAGE_TRACKER.getName(); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - long usage = task.getTotalResourceStats().getCpuTimeInNanos(); - long threshold = thresholdSupplier.getAsLong(); - - if (usage < threshold) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "cpu usage exceeded [" - + new TimeValue(usage, TimeUnit.NANOSECONDS) - + " >= " - + new TimeValue(threshold, TimeUnit.NANOSECONDS) - + "]", - 1 // TODO: fine-tune the cancellation score/weight - ) - ); - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index f1e8abe7e3230..216947315cd2d 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -12,6 +12,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -34,8 +35,35 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker { private final LongSupplier timeNanosSupplier; public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) { + this(thresholdSupplier, timeNanosSupplier, (Task task) -> { + long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); + long threshold = thresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "elapsed time exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + }); + } + + public ElapsedTimeTracker( + LongSupplier thresholdSupplier, + LongSupplier timeNanosSupplier, + ResourceUsageBreachEvaluator resourceUsageBreachEvaluator + ) { this.thresholdSupplier = thresholdSupplier; this.timeNanosSupplier = timeNanosSupplier; + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; } @Override @@ -43,27 +71,6 @@ public String name() { return ELAPSED_TIME_TRACKER.getName(); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); - long threshold = thresholdSupplier.getAsLong(); - - if (usage < threshold) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "elapsed time exceeded [" - + new TimeValue(usage, TimeUnit.NANOSECONDS) - + " >= " - + new TimeValue(threshold, TimeUnit.NANOSECONDS) - + "]", - 1 // TODO: fine-tune the cancellation score/weight - ) - ); - } - @Override public TaskResourceUsageTracker.Stats stats(List activeTasks) { long now = timeNanosSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index 56b9f947f6e37..c69de8ce21f89 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -18,6 +18,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -55,6 +56,43 @@ public HeapUsageTracker( this.heapPercentThresholdSupplier = heapPercentThresholdSupplier; this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize); + setDefaultResourceUsageBreachEvaluator(); + } + + /** + * Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor + * error: cannot reference movingAverageReference before supertype constructor has been called + */ + private void setDefaultResourceUsageBreachEvaluator() { + this.resourceUsageBreachEvaluator = (task) -> { + MovingAverage movingAverage = movingAverageReference.get(); + + // There haven't been enough measurements. + if (movingAverage.isReady() == false) { + return Optional.empty(); + } + + double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); + double averageUsage = movingAverage.getAverage(); + double variance = heapVarianceSupplier.getAsDouble(); + double allowedUsage = averageUsage * variance; + double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES; + + if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "heap usage exceeded [" + + new ByteSizeValue((long) currentUsage) + + " >= " + + new ByteSizeValue((long) allowedUsage) + + "]", + (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight + ) + ); + }; } @Override @@ -67,33 +105,6 @@ public void update(Task task) { movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); } - @Override - public Optional checkAndMaybeGetCancellationReason(Task task) { - MovingAverage movingAverage = movingAverageReference.get(); - - // There haven't been enough measurements. - if (movingAverage.isReady() == false) { - return Optional.empty(); - } - - double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); - double averageUsage = movingAverage.getAverage(); - double variance = heapVarianceSupplier.getAsDouble(); - double allowedUsage = averageUsage * variance; - double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES; - - if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) { - return Optional.empty(); - } - - return Optional.of( - new TaskCancellation.Reason( - "heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]", - (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight - ) - ); - } - private void updateWindowSize(int heapMovingAverageWindowSize) { this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java deleted file mode 100644 index 8e35c724a8fef..0000000000000 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.backpressure.trackers; - -import org.opensearch.common.util.Streak; - -import java.util.function.BooleanSupplier; - -/** - * NodeDuressTracker is used to check if the node is in duress. - * - * @opensearch.internal - */ -public class NodeDuressTracker { - /** - * Tracks the number of consecutive breaches. - */ - private final Streak breaches = new Streak(); - - /** - * Predicate that returns true when the node is in duress. - */ - private final BooleanSupplier isNodeInDuress; - - public NodeDuressTracker(BooleanSupplier isNodeInDuress) { - this.isNodeInDuress = isNodeInDuress; - } - - /** - * Evaluates the predicate and returns the number of consecutive breaches. - */ - public int check() { - return breaches.record(isNodeInDuress.getAsBoolean()); - } -} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java new file mode 100644 index 0000000000000..ae60a82fc2816 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.util.Streak; +import org.opensearch.search.ResourceType; + +import java.util.Map; +import java.util.function.BooleanSupplier; +import java.util.function.IntSupplier; + +/** + * NodeDuressTrackers is used to check if the node is in duress based on various resources. + * + * @opensearch.internal + */ +public class NodeDuressTrackers { + private final Map duressTrackers; + + public NodeDuressTrackers(Map duressTrackers) { + this.duressTrackers = duressTrackers; + } + + /** + * Method to check the {@link ResourceType} in duress + * @return Boolean + */ + public boolean isResourceInDuress(ResourceType resourceType) { + return duressTrackers.get(resourceType).test(); + } + + /** + * Method to evaluate whether the node is in duress or not + * @return true if node is in duress because of either system resource + */ + public boolean isNodeInDuress() { + for (ResourceType resourceType : ResourceType.values()) { + if (isResourceInDuress(resourceType)) { + return true; + } + } + return false; + } + + /** + * NodeDuressTracker is used to check if the node is in duress + * @opensearch.internal + */ + public static class NodeDuressTracker { + /** + * Tracks the number of consecutive breaches. + */ + private final Streak breaches = new Streak(); + + /** + * Predicate that returns true when the node is in duress. + */ + private final BooleanSupplier isNodeInDuress; + + /** + * Predicate that returns the max number of breaches allowed for this resource before we mark it as in duress + */ + private final IntSupplier maxBreachAllowedSupplier; + + public NodeDuressTracker(BooleanSupplier isNodeInDuress, IntSupplier maxBreachAllowedSupplier) { + this.isNodeInDuress = isNodeInDuress; + this.maxBreachAllowedSupplier = maxBreachAllowedSupplier; + } + + /** + * Returns true if the node is in duress consecutively for the past 'n' observations. + */ + public boolean test() { + return breaches.record(isNodeInDuress.getAsBoolean()) >= maxBreachAllowedSupplier.getAsInt(); + } + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java deleted file mode 100644 index ce15e9e9b6622..0000000000000 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.search.backpressure.trackers; - -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskCancellation; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; - -/** - * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks. - * - * @opensearch.internal - */ -public abstract class TaskResourceUsageTracker { - /** - * Counts the number of cancellations made due to this tracker. - */ - private final AtomicLong cancellations = new AtomicLong(); - - public long incrementCancellations() { - return cancellations.incrementAndGet(); - } - - public long getCancellations() { - return cancellations.get(); - } - - /** - * Returns a unique name for this tracker. - */ - public abstract String name(); - - /** - * Notifies the tracker to update its state when a task execution completes. - */ - public void update(Task task) {} - - /** - * Returns the cancellation reason for the given task, if it's eligible for cancellation. - */ - public abstract Optional checkAndMaybeGetCancellationReason(Task task); - - /** - * Returns the tracker's state for tasks as seen in the stats API. - */ - public abstract Stats stats(List activeTasks); - - /** - * Represents the tracker's state as seen in the stats API. - */ - public interface Stats extends ToXContentObject, Writeable {} -} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java new file mode 100644 index 0000000000000..3b0072288681c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * TaskResourceUsageTrackers is used to hold all the {@link TaskResourceUsageTracker} objects. + * + * @opensearch.internal + */ +public class TaskResourceUsageTrackers { + private final EnumMap all; + + public TaskResourceUsageTrackers() { + all = new EnumMap<>(TaskResourceUsageTrackerType.class); + } + + /** + * adds the tracker for the TrackerType + * @param tracker is {@link TaskResourceUsageTracker} implementation which will be added + * @param trackerType is {@link TaskResourceUsageTrackerType} which depicts the implementation type + */ + public void addTracker(final TaskResourceUsageTracker tracker, final TaskResourceUsageTrackerType trackerType) { + all.put(trackerType, tracker); + } + + /** + * getter for tracker for a {@link TaskResourceUsageTrackerType} + * @param type for which the implementation is returned + * @return the {@link TaskResourceUsageTrackerType} + */ + public Optional getTracker(TaskResourceUsageTrackerType type) { + return Optional.ofNullable(all.get(type)); + } + + /** + * Method to access all available {@link TaskResourceUsageTracker} + * @return all enabled and available {@link TaskResourceUsageTracker}s + */ + public List all() { + return new ArrayList<>(all.values()); + } + + /** + * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks. + * @opensearch.internal + */ + public static abstract class TaskResourceUsageTracker { + /** + * Counts the number of cancellations made due to this tracker. + */ + private final AtomicLong cancellations = new AtomicLong(); + protected ResourceUsageBreachEvaluator resourceUsageBreachEvaluator; + + /** + * for test purposes only + * @param resourceUsageBreachEvaluator which suggests whether a task should be cancelled or not + */ + public void setResourceUsageBreachEvaluator(final ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) { + this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator; + } + + public long incrementCancellations() { + return cancellations.incrementAndGet(); + } + + public long getCancellations() { + return cancellations.get(); + } + + /** + * Returns a unique name for this tracker. + */ + public abstract String name(); + + /** + * Notifies the tracker to update its state when a task execution completes. + */ + public void update(Task task) {} + + /** + * Returns the cancellation reason for the given task, if it's eligible for cancellation. + */ + public Optional checkAndMaybeGetCancellationReason(Task task) { + return resourceUsageBreachEvaluator.evaluate(task); + } + + /** + * Returns the tracker's state for tasks as seen in the stats API. + */ + public abstract Stats stats(List activeTasks); + + /** + * Method to get taskCancellations due to this tracker for the given {@link CancellableTask} tasks + * @param tasks cancellation eligible tasks due to node duress and search traffic threshold breach + * @return the list of tasks which are breaching task level thresholds for this {@link TaskResourceUsageTracker} + */ + public List getTaskCancellations(List tasks) { + return tasks.stream() + .map(task -> this.getTaskCancellation(task, List.of(this::incrementCancellations))) + .filter(TaskCancellation::isEligibleForCancellation) + .collect(Collectors.toList()); + } + + private TaskCancellation getTaskCancellation(final CancellableTask task, final List cancellationCallback) { + Optional reason = checkAndMaybeGetCancellationReason(task); + List reasons = new ArrayList<>(); + reason.ifPresent(reasons::add); + + return new TaskCancellation(task, reasons, cancellationCallback); + } + + /** + * Represents the tracker's state as seen in the stats API. + */ + public interface Stats extends ToXContentObject, Writeable {} + + /** + * This interface carries the logic to decide whether a task should be cancelled or not + */ + public interface ResourceUsageBreachEvaluator { + /** + * evaluates whether the task is eligible for cancellation based on {@link TaskResourceUsageTracker} implementation + * @param task is input to this method on which the cancellation evaluation is performed + * @return a {@link TaskCancellation.Reason} why this task should be cancelled otherwise empty + */ + public Optional evaluate(final Task task); + } + } +} diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java index 2d152e513f197..872f5b79bb205 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java @@ -42,10 +42,25 @@ public List getReasons() { return reasons; } + public List getOnCancelCallbacks() { + return onCancelCallbacks; + } + public String getReasonString() { return reasons.stream().map(Reason::getMessage).collect(Collectors.joining(", ")); } + public TaskCancellation merge(final TaskCancellation other) { + if (other == this) { + return this; + } + final List newReasons = new ArrayList<>(reasons); + newReasons.addAll(other.getReasons()); + final List newOnCancelCallbacks = new ArrayList<>(onCancelCallbacks); + newOnCancelCallbacks.addAll(other.onCancelCallbacks); + return new TaskCancellation(task, newReasons, newOnCancelCallbacks); + } + /** * Cancels the task and invokes all onCancelCallbacks. */ diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index f0d930c4c3acb..26ec76d5b7f50 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.ResourceType; import org.opensearch.search.backpressure.settings.SearchBackpressureMode; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; @@ -23,9 +24,11 @@ import org.opensearch.search.backpressure.stats.SearchBackpressureStats; import org.opensearch.search.backpressure.stats.SearchShardTaskStats; import org.opensearch.search.backpressure.stats.SearchTaskStats; -import org.opensearch.search.backpressure.trackers.NodeDuressTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; @@ -42,6 +45,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,10 +56,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; +import static org.opensearch.search.ResourceType.CPU; +import static org.opensearch.search.ResourceType.JVM; import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyDouble; +import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -89,8 +97,15 @@ public void testIsNodeInDuress() { AtomicReference cpuUsage = new AtomicReference<>(); AtomicReference heapUsage = new AtomicReference<>(); - NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); - NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5); + NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 3); + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, heapUsageTracker); + put(ResourceType.CPU, cpuUsageTracker); + } + }; SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -102,9 +117,9 @@ public void testIsNodeInDuress() { mockTaskResourceTrackingService, threadPool, System::nanoTime, - List.of(cpuUsageTracker, heapUsageTracker), - Collections.emptyList(), - Collections.emptyList(), + new NodeDuressTrackers(duressTrackers), + new TaskResourceUsageTrackers(), + new TaskResourceUsageTrackers(), taskManager ); @@ -132,6 +147,8 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -143,15 +160,15 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), - Collections.emptyList(), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), taskManager ); for (int i = 0; i < 100; i++) { // service.onTaskCompleted(new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>())); - service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200, i)); } assertEquals(100, service.getSearchBackpressureState(SearchTask.class).getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); @@ -161,6 +178,8 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234); TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); SearchBackpressureSettings settings = new SearchBackpressureSettings( Settings.EMPTY, @@ -172,16 +191,16 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - Collections.emptyList(), - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), + new NodeDuressTrackers(new EnumMap<>(ResourceType.class)), + new TaskResourceUsageTrackers(), + taskResourceUsageTrackers, taskManager ); // Record task completions to update the tracker state. Tasks other than SearchTask & SearchShardTask are ignored. - service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200, 101)); for (int i = 0; i < 100; i++) { - service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200, i)); } assertEquals(100, service.getSearchBackpressureState(SearchShardTask.class).getCompletionCount()); verify(mockTaskResourceUsageTracker, times(100)).update(any()); @@ -192,21 +211,41 @@ public void testSearchTaskInFlightCancellation() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; - NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3); - TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { + return Optional.empty(); + } + + return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + } + ); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); // Mocking 'settings' with predictable rate limiting thresholds. SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 5.0); + NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> false, () -> 3); + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, heapUsageTracker); + put(CPU, mockNodeDuressTracker); + } + }; + SearchBackpressureService service = new SearchBackpressureService( settings, mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - List.of(mockNodeDuressTracker), - List.of(mockTaskResourceUsageTracker), - Collections.emptyList(), + new NodeDuressTrackers(duressTrackers), + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), mockTaskManager ); @@ -225,9 +264,9 @@ public void testSearchTaskInFlightCancellation() { Map activeSearchTasks = new HashMap<>(); for (long i = 0; i < 75; i++) { if (i % 3 == 0) { - activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes)); + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes, i)); } else { - activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes)); + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes, i)); } } doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); @@ -264,9 +303,28 @@ public void testSearchShardTaskInFlightCancellation() { TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); AtomicLong mockTime = new AtomicLong(0); LongSupplier mockTimeNanosSupplier = mockTime::get; - NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true); + NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3); - TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(); + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, mockNodeDuressTracker); + } + }; + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { + return Optional.empty(); + } + + return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + } + ); + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); // Mocking 'settings' with predictable rate limiting thresholds. SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); @@ -276,9 +334,9 @@ public void testSearchShardTaskInFlightCancellation() { mockTaskResourceTrackingService, threadPool, mockTimeNanosSupplier, - List.of(mockNodeDuressTracker), - Collections.emptyList(), - List.of(mockTaskResourceUsageTracker), + nodeDuressTrackers, + new TaskResourceUsageTrackers(), + taskResourceUsageTrackers, mockTaskManager ); @@ -297,9 +355,9 @@ public void testSearchShardTaskInFlightCancellation() { Map activeSearchShardTasks = new HashMap<>(); for (long i = 0; i < 75; i++) { if (i % 5 == 0) { - activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes, i)); } else { - activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); + activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i)); } } doReturn(activeSearchShardTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); @@ -317,7 +375,7 @@ public void testSearchShardTaskInFlightCancellation() { // Simulate task completion to replenish some tokens. // This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'. for (int i = 0; i < 20; i++) { - service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes)); + service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i)); } service.doRun(); verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); @@ -333,6 +391,180 @@ public void testSearchShardTaskInFlightCancellation() { assertEquals(expectedStats, actualStats); } + public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() { + TaskManager mockTaskManager = spy(taskManager); + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + AtomicLong mockTime = new AtomicLong(0); + LongSupplier mockTimeNanosSupplier = mockTime::get; + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the + // tasks but heap based cancellation should not happen because heap is not in duress + TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10)) + ); + TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) { + return Optional.empty(); + } + return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5)); + } + ); + + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); + taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER); + + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); + + SearchBackpressureService service = new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + nodeDuressTrackers, + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), + mockTaskManager + ); + + service.doRun(); + service.doRun(); + + SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings); + + // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). + Map activeSearchTasks = new HashMap<>(); + for (long i = 0; i < 75; i++) { + if (i % 5 == 0) { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, 800, i)); + } else { + activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, 800, i)); + } + } + doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); + + // this will trigger cancellation but these cancellation should only be cpu based + service.doRun(); + verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(5, service.getSearchBackpressureState(SearchTask.class).getCancellationCount()); + assertEquals(1, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount()); + + SearchBackpressureStats expectedStats = new SearchBackpressureStats( + new SearchTaskStats( + 5, + 1, + Map.of( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + new MockStats(5), + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + new MockStats(0) + ) + ), + new SearchShardTaskStats(0, 0, Collections.emptyMap()), + SearchBackpressureMode.ENFORCED + ); + + SearchBackpressureStats actualStats = service.nodeStats(); + assertEquals(expectedStats, actualStats); + } + + public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation() { + TaskManager mockTaskManager = spy(taskManager); + TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class); + AtomicLong mockTime = new AtomicLong(0); + LongSupplier mockTimeNanosSupplier = mockTime::get; + + EnumMap duressTrackers = new EnumMap<>(ResourceType.class) { + { + put(JVM, new NodeDuressTracker(() -> false, () -> 3)); + put(CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers); + + // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the + // tasks but heap based cancellation should not happen because heap is not in duress + TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER, + (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10)) + ); + TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, + (task) -> { + if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) { + return Optional.empty(); + } + return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5)); + } + ); + + TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers(); + taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER); + taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER); + + // Mocking 'settings' with predictable rate limiting thresholds. + SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0); + + SearchBackpressureService service = spy( + new SearchBackpressureService( + settings, + mockTaskResourceTrackingService, + threadPool, + mockTimeNanosSupplier, + nodeDuressTrackers, + taskResourceUsageTrackers, + new TaskResourceUsageTrackers(), + mockTaskManager + ) + ); + + when(service.isHeapUsageDominatedBySearch(anyList(), anyDouble())).thenReturn(false); + + service.doRun(); + service.doRun(); + + SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class); + // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService + when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0); + when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings); + + // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). + Map activeSearchTasks = new HashMap<>(); + for (long i = 0; i < 75; i++) { + Class taskType = randomBoolean() ? SearchTask.class : SearchShardTask.class; + if (i % 5 == 0) { + activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 500, 800, i)); + } else { + activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 100, 800, i)); + } + } + doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks(); + + // this will trigger cancellation but the cancellation should not happen as the node is not is duress because of search traffic + service.doRun(); + + verify(mockTaskManager, times(0)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any()); + assertEquals(0, service.getSearchBackpressureState(SearchTask.class).getCancellationCount()); + assertEquals(0, service.getSearchBackpressureState(SearchShardTask.class).getCancellationCount()); + } + private SearchBackpressureSettings getBackpressureSettings(String mode, double ratio, double rate, double burst) { return spy( new SearchBackpressureSettings( @@ -342,11 +574,14 @@ private SearchBackpressureSettings getBackpressureSettings(String mode, double r ); } - private TaskResourceUsageTracker getMockedTaskResourceUsageTracker() { + private TaskResourceUsageTracker getMockedTaskResourceUsageTracker( + TaskResourceUsageTrackerType type, + TaskResourceUsageTracker.ResourceUsageBreachEvaluator evaluator + ) { return new TaskResourceUsageTracker() { @Override public String name() { - return TaskResourceUsageTrackerType.CPU_USAGE_TRACKER.getName(); + return type.getName(); } @Override @@ -354,11 +589,7 @@ public void update(Task task) {} @Override public Optional checkAndMaybeGetCancellationReason(Task task) { - if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) { - return Optional.empty(); - } - - return Optional.of(new TaskCancellation.Reason("limits exceeded", 5)); + return evaluator.evaluate(task); } @Override diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java index 6478fdfff61d4..89a743efa09d8 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java @@ -12,8 +12,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.AbstractWireSerializingTestCase; import java.util.Map; diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java index eb33bc1c37b7e..7117e6a33a5af 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java @@ -12,8 +12,8 @@ import org.opensearch.search.backpressure.trackers.CpuUsageTracker; import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; import org.opensearch.search.backpressure.trackers.HeapUsageTracker; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.AbstractWireSerializingTestCase; import java.util.Map; diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index 8cdcbc7511bd2..0117b0ed71c27 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -33,7 +33,7 @@ public class CpuUsageTrackerTests extends OpenSearchTestCase { ); public void testSearchTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200); + Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -43,7 +43,7 @@ public void testSearchTaskEligibleForCancellation() { } public void testSearchShardTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -53,7 +53,7 @@ public void testSearchShardTaskEligibleForCancellation() { } public void testNotEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200, randomNonNegativeLong()); CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index 921d01e7355a7..514f1b4785aa1 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -47,7 +47,7 @@ public void testSearchTaskEligibleForCancellation() { } public void testSearchShardTaskEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0, randomNonNegativeLong()); ElapsedTimeTracker tracker = new ElapsedTimeTracker( mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, () -> 200000000 @@ -60,7 +60,7 @@ public void testSearchShardTaskEligibleForCancellation() { } public void testNotEligibleForCancellation() { - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000, randomNonNegativeLong()); ElapsedTimeTracker tracker = new ElapsedTimeTracker( mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold, () -> 200000000 diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index 3950d00b0c8b5..1c46305e9fda6 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -58,7 +58,7 @@ public void testSearchTaskEligibleForCancellation() { SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE ) ); - Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50); + Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50, randomNonNegativeLong()); // Record enough observations to make the moving average 'ready'. for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { @@ -66,7 +66,7 @@ public void testSearchTaskEligibleForCancellation() { } // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). - task = createMockTaskWithResourceStats(SearchTask.class, 1, 300); + task = createMockTaskWithResourceStats(SearchTask.class, 1, 300, randomNonNegativeLong()); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(6, reason.get().getCancellationScore()); @@ -88,7 +88,7 @@ public void testSearchShardTaskEligibleForCancellation() { SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE ) ); - Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50, randomNonNegativeLong()); // Record enough observations to make the moving average 'ready'. for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { @@ -96,7 +96,7 @@ public void testSearchShardTaskEligibleForCancellation() { } // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance). - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200, randomNonNegativeLong()); Optional reason = tracker.checkAndMaybeGetCancellationReason(task); assertTrue(reason.isPresent()); assertEquals(4, reason.get().getCancellationScore()); @@ -122,7 +122,7 @@ public void testNotEligibleForCancellation() { ); // Task with heap usage < heapBytesThreshold. - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99, randomNonNegativeLong()); // Not enough observations. reason = tracker.checkAndMaybeGetCancellationReason(task); @@ -139,7 +139,12 @@ public void testNotEligibleForCancellation() { // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled. double allowedHeapUsage = 99.0 * 2.0; - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); + task = createMockTaskWithResourceStats( + SearchShardTask.class, + 1, + randomLongBetween(99, (long) allowedHeapUsage - 1), + randomNonNegativeLong() + ); reason = tracker.checkAndMaybeGetCancellationReason(task); assertFalse(reason.isPresent()); } @@ -148,12 +153,12 @@ public void testIsHeapUsageDominatedBySearch() { assumeTrue("Skip the test if the hardware doesn't support heap usage tracking", HeapUsageTracker.isHeapTrackingSupported()); // task with 1 byte of heap usage so that it does not breach the threshold - CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1); + CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, randomNonNegativeLong()); assertFalse(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5)); long totalHeap = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); // task with heap usage of [totalHeap - 1] so that it breaches the threshold - task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1); + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1, randomNonNegativeLong()); assertTrue(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5)); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java index 472ba95566523..32aca6ac3230e 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.atomic.AtomicReference; @@ -16,20 +17,20 @@ public class NodeDuressTrackerTests extends OpenSearchTestCase { public void testNodeDuressTracker() { AtomicReference cpuUsage = new AtomicReference<>(0.0); - NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5); + NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3); // Node not in duress. - assertEquals(0, tracker.check()); + assertFalse(tracker.test()); // Node in duress; the streak must keep increasing. cpuUsage.set(0.7); - assertEquals(1, tracker.check()); - assertEquals(2, tracker.check()); - assertEquals(3, tracker.check()); + assertFalse(tracker.test()); + assertFalse(tracker.test()); + assertTrue(tracker.test()); // Node not in duress anymore. cpuUsage.set(0.3); - assertEquals(0, tracker.check()); - assertEquals(0, tracker.check()); + assertFalse(tracker.test()); + assertFalse(tracker.test()); } } diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java new file mode 100644 index 0000000000000..2db251ee461db --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.search.ResourceType; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.EnumMap; + +public class NodeDuressTrackersTests extends OpenSearchTestCase { + + public void testNodeNotInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 2)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 2)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenHeapInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 1)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenCPUInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 1)); + put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } + + public void testNodeInDuressWhenCPUAndHeapInDuress() { + EnumMap map = new EnumMap<>(ResourceType.class) { + { + put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3)); + put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 3)); + } + }; + + NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map); + + assertFalse(nodeDuressTrackers.isNodeInDuress()); + assertFalse(nodeDuressTrackers.isNodeInDuress()); + + // for the third time it should be in duress + assertTrue(nodeDuressTrackers.isNodeInDuress()); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java index e74f89c905499..f08c12ea258ca 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java @@ -9,7 +9,7 @@ package org.opensearch.tasks; import org.opensearch.action.search.SearchShardTask; -import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; +import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -69,7 +69,7 @@ public Optional checkAndMaybeGetCancellationReason(Task } @Override - public Stats stats(List activeTasks) { + public TaskResourceUsageTracker.Stats stats(List activeTasks) { return null; } }; diff --git a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java index af06b1688dca2..8f31f2a60ea86 100644 --- a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java +++ b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java @@ -21,19 +21,21 @@ public class SearchBackpressureTestHelpers extends OpenSearchTestCase { - public static T createMockTaskWithResourceStats(Class type, long cpuUsage, long heapUsage) { - return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0); + public static T createMockTaskWithResourceStats(Class type, long cpuUsage, long heapUsage, long taskId) { + return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0, taskId); } public static T createMockTaskWithResourceStats( Class type, long cpuUsage, long heapUsage, - long startTimeNanos + long startTimeNanos, + long taskId ) { T task = mock(type); when(task.getTotalResourceStats()).thenReturn(new TaskResourceUsage(cpuUsage, heapUsage)); when(task.getStartTimeNanos()).thenReturn(startTimeNanos); + when(task.getId()).thenReturn(randomNonNegativeLong()); AtomicBoolean isCancelled = new AtomicBoolean(false); doAnswer(invocation -> {