Skip to content

Commit

Permalink
[CCI] [BUG] Fixing extension settings update consumers (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#7456)

* add equels for ClusterSettings

Signed-off-by: Kuanysh <kuanysh4646@gmail.com>

* added junit

Signed-off-by: Kuanysh <kuanysh4646@gmail.com>

* code refactoring

Signed-off-by: Kuanysh <kuanysh4646@gmail.com>

* added changes to handleAddSettingsUpdateConsumer

Signed-off-by: Kuanysh <90975457+Kuanysh-kst@users.noreply.github.com>

* code refactoring

Signed-off-by: Kuanysh Aimurzinov <kuanysh4646@gmail.com>

* code refactoring

Signed-off-by: Kuanysh Aimurzinov <kuanysh4646@gmail.com>

* changed main method

Signed-off-by: Kuanysh <90975457+Kuanysh-kst@users.noreply.github.com>

---------

Signed-off-by: Kuanysh <kuanysh4646@gmail.com>
Signed-off-by: Kuanysh <90975457+Kuanysh-kst@users.noreply.github.com>
Signed-off-by: Kuanysh Aimurzinov <kuanysh4646@gmail.com>
  • Loading branch information
Kuanysh-kst authored and gaiksaya committed Jun 21, 2023
1 parent f2982ee commit c7ca638
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.settings.WriteableSetting;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;
Expand All @@ -31,6 +32,7 @@ public class AddSettingsUpdateConsumerRequestHandler {
private final ClusterService clusterService;
private final TransportService transportService;
private final String updateSettingsRequestType;
private final SettingsModule settingsModule;

/**
* Instantiates a new Add Settings Update Consumer Request Handler with the {@link ClusterService} and {@link TransportService}
Expand All @@ -42,11 +44,13 @@ public class AddSettingsUpdateConsumerRequestHandler {
public AddSettingsUpdateConsumerRequestHandler(
ClusterService clusterService,
TransportService transportService,
String updateSettingsRequestType
String updateSettingsRequestType,
SettingsModule settingsModule
) {
this.clusterService = clusterService;
this.transportService = transportService;
this.updateSettingsRequestType = updateSettingsRequestType;
this.settingsModule = settingsModule;
}

/**
Expand All @@ -68,25 +72,53 @@ TransportResponse handleAddSettingsUpdateConsumerRequest(AddSettingsUpdateConsum

// Extract setting and type from writeable setting
Setting<?> setting = extensionComponentSetting.getSetting();

// we need to get the actual setting from nodeSetting or indexsetting maps in SettingsModule
// use conditional based on setting properties
Setting<?> settingForUpdateConsumer = null;
if (setting.hasIndexScope()) {
settingForUpdateConsumer = settingsModule.getIndexScopedSettings().get(setting.getKey());
} else if (setting.hasNodeScope()) {
settingForUpdateConsumer = settingsModule.getClusterSettings().get(setting.getKey());
}
// do a null check and throw IllegalArgument exception here if neither index or node scope
if (settingForUpdateConsumer == null) {
throw new IllegalArgumentException("Not index or node scope");
}

WriteableSetting.SettingType settingType = extensionComponentSetting.getType();

// Register setting update consumer with callback method to extension
clusterService.getClusterSettings().addSettingsUpdateConsumer(setting, (data) -> {
logger.debug("Sending extension request type: " + updateSettingsRequestType);
UpdateSettingsResponseHandler updateSettingsResponseHandler = new UpdateSettingsResponseHandler();
transportService.sendRequest(
extensionNode,
updateSettingsRequestType,
new UpdateSettingsRequest(settingType, setting, data),
updateSettingsResponseHandler
);
});
if (setting.hasIndexScope()) {
// Register setting update consumer with callback method to extension
clusterService.getClusterSettings().addSettingsUpdateConsumer(settingForUpdateConsumer, (data) -> {
logger.debug("Sending extension request type: " + updateSettingsRequestType);
UpdateSettingsResponseHandler updateSettingsResponseHandler = new UpdateSettingsResponseHandler();
transportService.sendRequest(
extensionNode,
updateSettingsRequestType,
new UpdateSettingsRequest(settingType, setting, data),
updateSettingsResponseHandler
);
});
}
if (setting.hasNodeScope()) {
// Register setting update consumer with callback method to extension
clusterService.getClusterSettings().addSettingsUpdateConsumer(settingForUpdateConsumer, (data) -> {
logger.debug("Sending extension request type: " + updateSettingsRequestType);
UpdateSettingsResponseHandler updateSettingsResponseHandler = new UpdateSettingsResponseHandler();
transportService.sendRequest(
extensionNode,
updateSettingsRequestType,
new UpdateSettingsRequest(settingType, setting, data),
updateSettingsResponseHandler
);
});
}
}
} catch (SettingsException e) {
} catch (SettingsException | IllegalArgumentException e) {
logger.error(e.toString());
status = false;
}

return new AcknowledgedResponse(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void initializeServicesAndRestHandler(
this.addSettingsUpdateConsumerRequestHandler = new AddSettingsUpdateConsumerRequestHandler(
clusterService,
transportService,
REQUEST_EXTENSION_UPDATE_SETTINGS
REQUEST_EXTENSION_UPDATE_SETTINGS,
settingsModule
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,4 +1462,20 @@ public List<String> getListValue(final List<String> value) {
);
}

public void testAddSettingsUpdateConsumer() {
Setting<Integer> testSetting = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> testSetting2 = Setting.intSetting("foo.bar.baz", 1, Property.Dynamic, Property.NodeScope);
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, new HashSet<>(Arrays.asList(testSetting, testSetting2)));
AtomicInteger consumer2 = new AtomicInteger();
service.addSettingsUpdateConsumer(testSetting2, consumer2::set, (s) -> assertTrue(s > 0));
Setting<Integer> wrongKeySetting = Setting.intSetting("foo.bar.wrong", 1, Property.Dynamic, Property.NodeScope);

expectThrows(SettingsException.class, () -> service.addSettingsUpdateConsumer(wrongKeySetting, consumer2::set, (i) -> {
if (i == 42) throw new AssertionError("wrong key");
}));

expectThrows(NullPointerException.class, () -> service.addSettingsUpdateConsumer(null, consumer2::set, (i) -> {
if (i == 42) throw new AssertionError("empty key");
}));
}
}

0 comments on commit c7ca638

Please sign in to comment.