Skip to content

Commit

Permalink
Make multiple settings dynamic for tuning on larger clusters (#16347) (
Browse files Browse the repository at this point in the history
…#16442)

(cherry picked from commit ca40ba4)

Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 41258e7 commit b1f2ff8
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))
- Update last seen cluster state in the commit phase ([#16215](https://github.com/opensearch-project/OpenSearch/pull/16215))
- Make multiple settings dynamic for tuning on larger clusters([#16347](https://github.com/opensearch-project/OpenSearch/pull/16347))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
"cluster.publish.timeout",
TimeValue.timeValueMillis(30000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Settings settings;
Expand All @@ -165,7 +166,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Random random;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final SeedHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private TimeValue publishTimeout;
private final TimeValue publishInfoTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
Expand Down Expand Up @@ -248,6 +249,7 @@ public Coordinator(
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
Expand Down Expand Up @@ -302,6 +304,7 @@ public Coordinator(
);
this.lagDetector = new LagDetector(
settings,
clusterSettings,
transportService.getThreadPool(),
n -> removeNode(n, "lagging"),
transportService::getLocalNode
Expand All @@ -320,6 +323,10 @@ public Coordinator(
this.clusterSettings = clusterSettings;
}

private void setPublishTimeout(TimeValue publishTimeout) {
this.publishTimeout = publishTimeout;
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(
settings,
Expand Down Expand Up @@ -1670,7 +1677,6 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;

this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ protected void doRun() {
if (isClosed.get()) {
logger.debug("{} not starting election", this);
} else {
logger.debug("{} starting election", this);
logger.debug("{} starting election with duration {}", this, duration);
scheduleNextElection(duration, scheduledRunnable);
scheduledRunnable.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(150000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -115,7 +116,7 @@ public class FollowersChecker {

private final Settings settings;

private final TimeValue followerCheckInterval;
private TimeValue followerCheckInterval;
private TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
Expand Down Expand Up @@ -148,6 +149,7 @@ public FollowersChecker(
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_INTERVAL_SETTING, this::setFollowerCheckInterval);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
Expand All @@ -167,6 +169,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckInterval(TimeValue followerCheckInterval) {
this.followerCheckInterval = followerCheckInterval;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
this.followerCheckTimeout = followerCheckTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -68,23 +69,26 @@ public class LagDetector {
"cluster.follower_lag.timeout",
TimeValue.timeValueMillis(90000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final TimeValue clusterStateApplicationTimeout;
private TimeValue clusterStateApplicationTimeout;
private final Consumer<DiscoveryNode> onLagDetected;
private final Supplier<DiscoveryNode> localNodeSupplier;
private final ThreadPool threadPool;
private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();

public LagDetector(
final Settings settings,
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Consumer<DiscoveryNode> onLagDetected,
final Supplier<DiscoveryNode> localNodeSupplier
) {
this.threadPool = threadPool;
this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, this::setFollowerLagTimeout);
this.onLagDetected = onLagDetected;
this.localNodeSupplier = localNodeSupplier;
}
Expand Down Expand Up @@ -136,6 +140,10 @@ public String toString() {
}
}

private void setFollowerLagTimeout(TimeValue followerCheckLagTimeout) {
this.clusterStateApplicationTimeout = followerCheckLagTimeout;
}

@Override
public String toString() {
return "LagDetector{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator";
private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
private long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
Expand All @@ -93,7 +93,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
DEFAULT_SHARD_BATCH_SIZE,
1,
10000,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
Expand Down Expand Up @@ -172,6 +173,7 @@ public ShardsBatchGatewayAllocator(
this.batchStartedAction = batchStartedAction;
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
clusterSettings.addSettingsUpdateConsumer(GATEWAY_ALLOCATOR_BATCH_SIZE, this::setMaxBatchSize);
this.primaryShardsBatchGatewayAllocatorTimeout = PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setPrimaryBatchAllocatorTimeout);
this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -402,6 +404,7 @@ else if (shardRouting.primary() == primary) {
Iterator<ShardRouting> iterator = newShardsToBatch.values().iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

logger.debug("Using async fetch batch size {}", maxBatchSize);
long batchSize = maxBatchSize;
Map<ShardId, ShardEntry> perBatchShards = new HashMap<>();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -906,6 +909,10 @@ public int getNumberOfStoreShardBatches() {
return batchIdToStoreShardBatch.size();
}

private void setMaxBatchSize(long maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}

protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) {
this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import static org.opensearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -42,10 +45,10 @@ public void testFollowerCheckTimeoutValueUpdate() {

public void testFollowerCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "151s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
"failed to parse value [151s] for setting [" + setting1.getKey() + "], must be <= [150000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
Expand All @@ -66,6 +69,38 @@ public void testFollowerCheckTimeoutMinValue() {
);
}

public void testFollowerCheckIntervalValueUpdate() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_INTERVAL_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(10), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testFollowerCheckIntervalMinValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_INTERVAL_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10ms").build();

assertThrows(
"failed to parse value [10ms] for setting [" + setting1.getKey() + "], must be >= [100ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
Expand Down Expand Up @@ -110,4 +145,70 @@ public void testLeaderCheckTimeoutMinValue() {
}
);
}

public void testClusterPublishTimeoutValueUpdate() {
Setting<TimeValue> setting1 = PUBLISH_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testClusterPublishTimeoutMinValue() {
Setting<TimeValue> setting1 = PUBLISH_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLagDetectorTimeoutUpdate() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "30s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(lagDetectorTimeout)
.execute()
.actionGet();

assertAcked(response);
assertEquals(timeValueSeconds(30), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
lagDetectorTimeout = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
}

public void testLagDetectorTimeoutMinValue() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -70,8 +71,9 @@ public void setupFixture() {
} else {
followerLagTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(Settings.EMPTY);
}

lagDetector = new LagDetector(settingsBuilder.build(), deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);
Settings settings = settingsBuilder.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
lagDetector = new LagDetector(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);

localNode = CoordinationStateTests.createNode("local");
node1 = CoordinationStateTests.createNode("node1");
Expand Down
Loading

0 comments on commit b1f2ff8

Please sign in to comment.