Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve the rejection logic for wlm #16417

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix multi-search with template doesn't return status code ([#16265](https://github.com/opensearch-project/OpenSearch/pull/16265))
- [Streaming Indexing] Fix intermittent 'The bulk request must be terminated by a newline [\n]' failures [#16337](https://github.com/opensearch-project/OpenSearch/pull/16337))
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
- [Workload Management] Make query groups persistent across process restarts ([#16370](https://github.com/opensearch-project/OpenSearch/pull/16370))
- [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417))
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ public void rejectIfNeeded(String queryGroupId) {
return;
}

// rejections will not happen for SOFT mode QueryGroups
// rejections will not happen for SOFT mode QueryGroups unless node is in duress
Optional<QueryGroup> optionalQueryGroup = activeQueryGroups.stream().filter(x -> x.get_id().equals(queryGroupId)).findFirst();

if (optionalQueryGroup.isPresent() && optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT)
return;
if (optionalQueryGroup.isPresent()
&& (optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT
&& !nodeDuressTrackers.isNodeInDuress())) return;

optionalQueryGroup.ifPresent(queryGroup -> {
boolean reject = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.Mockito.when;

public class QueryGroupServiceTests extends OpenSearchTestCase {
public static final String QUERY_GROUP_ID = "queryGroupId1";
private QueryGroupService queryGroupService;
private QueryGroupTaskCancellationService mockCancellationService;
private ClusterService mockClusterService;
Expand All @@ -68,6 +69,7 @@ public void setUp() throws Exception {
mockNodeDuressTrackers = Mockito.mock(NodeDuressTrackers.class);
mockCancellationService = Mockito.mock(TestQueryGroupCancellationService.class);
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor();
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(false);

queryGroupService = new QueryGroupService(
mockCancellationService,
Expand Down Expand Up @@ -203,26 +205,52 @@ public void testRejectIfNeeded_whenQueryGroupIdIsNullOrDefaultOne() {
verify(spyMap, never()).get(any());
}

public void testRejectIfNeeded_whenSoftModeQueryGroupIsContendedAndNodeInDuress() {
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
mockQueryGroupStateMap = new HashMap<>();
mockQueryGroupStateMap.put("queryGroupId1", new QueryGroupState());
QueryGroupState state = new QueryGroupState();
QueryGroupState.ResourceTypeState cpuResourceState = new QueryGroupState.ResourceTypeState(ResourceType.CPU);
cpuResourceState.setLastRecordedUsage(0.10);
state.getResourceState().put(ResourceType.CPU, cpuResourceState);
QueryGroupState spyState = spy(state);
mockQueryGroupStateMap.put(QUERY_GROUP_ID, spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
mockThreadPool,
mockWorkloadManagementSettings,
mockNodeDuressTrackers,
mockQueryGroupsStateAccessor,
activeQueryGroups,
new HashSet<>()
);
when(mockWorkloadManagementSettings.getWlmMode()).thenReturn(WlmMode.ENABLED);
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(true);
assertThrows(OpenSearchRejectedExecutionException.class, () -> queryGroupService.rejectIfNeeded("queryGroupId1"));
}

public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
QueryGroup testQueryGroup = new QueryGroup(
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.10)),
1L
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
mockQueryGroupStateMap = new HashMap<>();
QueryGroupState spyState = spy(new QueryGroupState());
mockQueryGroupStateMap.put("queryGroupId1", spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

Map<String, QueryGroupState> spyMap = spy(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
Expand All @@ -239,11 +267,11 @@ public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
}

public void testRejectIfNeeded_whenQueryGroupIsEnforcedMode_andNotBreaching() {
QueryGroup testQueryGroup = new QueryGroup(
QueryGroup testQueryGroup = getQueryGroup(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.10)),
1L
MutableQueryGroupFragment.ResiliencyMode.ENFORCED,
Map.of(ResourceType.CPU, 0.10)
);
QueryGroup spuQueryGroup = spy(testQueryGroup);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
Expand Down Expand Up @@ -464,6 +492,31 @@ public void testShouldSBPHandle() {

}

private static Set<QueryGroup> getActiveQueryGroups(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = getQueryGroup(name, id, mode, resourceLimits);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
return activeQueryGroups;
}

private static QueryGroup getQueryGroup(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = new QueryGroup(name, id, new MutableQueryGroupFragment(mode, resourceLimits), 1L);
return testQueryGroup;
}

// This is needed to test the behavior of QueryGroupService#doRun method
static class TestQueryGroupCancellationService extends QueryGroupTaskCancellationService {
public TestQueryGroupCancellationService(
Expand Down
Loading