Skip to content

Commit

Permalink
Improve the rejection logic for soft mode query groups during node du…
Browse files Browse the repository at this point in the history
…ress (opensearch-project#16417)

* improve the rejection logic for wlm

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Oct 22, 2024
1 parent 3312eda commit a8fac13
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ public void rejectIfNeeded(String queryGroupId) {
return;
}

// rejections will not happen for SOFT mode QueryGroups
// rejections will not happen for SOFT mode QueryGroups unless node is in duress
Optional<QueryGroup> optionalQueryGroup = activeQueryGroups.stream().filter(x -> x.get_id().equals(queryGroupId)).findFirst();

if (optionalQueryGroup.isPresent() && optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT)
return;
if (optionalQueryGroup.isPresent()
&& (optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT
&& !nodeDuressTrackers.isNodeInDuress())) return;

optionalQueryGroup.ifPresent(queryGroup -> {
boolean reject = false;
Expand Down
81 changes: 67 additions & 14 deletions server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.Mockito.when;

public class QueryGroupServiceTests extends OpenSearchTestCase {
public static final String QUERY_GROUP_ID = "queryGroupId1";
private QueryGroupService queryGroupService;
private QueryGroupTaskCancellationService mockCancellationService;
private ClusterService mockClusterService;
Expand All @@ -68,6 +69,7 @@ public void setUp() throws Exception {
mockNodeDuressTrackers = Mockito.mock(NodeDuressTrackers.class);
mockCancellationService = Mockito.mock(TestQueryGroupCancellationService.class);
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor();
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(false);

queryGroupService = new QueryGroupService(
mockCancellationService,
Expand Down Expand Up @@ -203,26 +205,52 @@ public void testRejectIfNeeded_whenQueryGroupIdIsNullOrDefaultOne() {
verify(spyMap, never()).get(any());
}

public void testRejectIfNeeded_whenSoftModeQueryGroupIsContendedAndNodeInDuress() {
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
mockQueryGroupStateMap = new HashMap<>();
mockQueryGroupStateMap.put("queryGroupId1", new QueryGroupState());
QueryGroupState state = new QueryGroupState();
QueryGroupState.ResourceTypeState cpuResourceState = new QueryGroupState.ResourceTypeState(ResourceType.CPU);
cpuResourceState.setLastRecordedUsage(0.10);
state.getResourceState().put(ResourceType.CPU, cpuResourceState);
QueryGroupState spyState = spy(state);
mockQueryGroupStateMap.put(QUERY_GROUP_ID, spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
mockThreadPool,
mockWorkloadManagementSettings,
mockNodeDuressTrackers,
mockQueryGroupsStateAccessor,
activeQueryGroups,
new HashSet<>()
);
when(mockWorkloadManagementSettings.getWlmMode()).thenReturn(WlmMode.ENABLED);
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(true);
assertThrows(OpenSearchRejectedExecutionException.class, () -> queryGroupService.rejectIfNeeded("queryGroupId1"));
}

public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
QueryGroup testQueryGroup = new QueryGroup(
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.10)),
1L
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
mockQueryGroupStateMap = new HashMap<>();
QueryGroupState spyState = spy(new QueryGroupState());
mockQueryGroupStateMap.put("queryGroupId1", spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

Map<String, QueryGroupState> spyMap = spy(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
Expand All @@ -239,11 +267,11 @@ public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
}

public void testRejectIfNeeded_whenQueryGroupIsEnforcedMode_andNotBreaching() {
QueryGroup testQueryGroup = new QueryGroup(
QueryGroup testQueryGroup = getQueryGroup(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.10)),
1L
MutableQueryGroupFragment.ResiliencyMode.ENFORCED,
Map.of(ResourceType.CPU, 0.10)
);
QueryGroup spuQueryGroup = spy(testQueryGroup);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
Expand Down Expand Up @@ -464,6 +492,31 @@ public void testShouldSBPHandle() {

}

private static Set<QueryGroup> getActiveQueryGroups(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = getQueryGroup(name, id, mode, resourceLimits);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
return activeQueryGroups;
}

private static QueryGroup getQueryGroup(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = new QueryGroup(name, id, new MutableQueryGroupFragment(mode, resourceLimits), 1L);
return testQueryGroup;
}

// This is needed to test the behavior of QueryGroupService#doRun method
static class TestQueryGroupCancellationService extends QueryGroupTaskCancellationService {
public TestQueryGroupCancellationService(
Expand Down

0 comments on commit a8fac13

Please sign in to comment.