Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot _status API: Include in-progress snapshots in total shard count and index filter #16394

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,9 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);
});

// across multiple snapshots
assertBusy(() -> {
Expand All @@ -636,13 +636,13 @@ public void testSnapshotStatusApiFailureForTooManyShardsAcrossSnapshots() throws
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request")
exception.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request")
);
}, 1, TimeUnit.MINUTES);
});

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

public void testSnapshotStatusForIndexFilter() throws Exception {
Expand All @@ -666,6 +666,7 @@ public void testSnapshotStatusForIndexFilter() throws Exception {
String snapshot = "test-snap-1";
createSnapshot(repositoryName, snapshot, List.of(index1, index2, index3));

// for a completed snapshot
assertBusy(() -> {
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
Expand All @@ -682,6 +683,96 @@ public void testSnapshotStatusForIndexFilter() throws Exception {
}, 1, TimeUnit.MINUTES);
}

public void testSnapshotStatusForIndexFilterForInProgressSnapshot() throws Exception {
String repositoryName = "test-repo";
createRepository(repositoryName, "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true));

logger.info("Create indices");
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
createIndex(index1, index2, index3);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 10; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "baz" + i);
index(index3, "_doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
String inProgressSnapshot = "test-in-progress-snapshot";

logger.info("Create snapshot");
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture = startFullSnapshot(repositoryName, inProgressSnapshot);

logger.info("Block data node");
waitForBlockOnAnyDataNode(repositoryName, TimeValue.timeValueMinutes(1));
awaitNumberOfSnapshotsInProgress(1);

// test normal functioning of index filter for in progress snapshot
assertBusy(() -> {
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2)
.get()
.getSnapshots()
.get(0);
Map<String, SnapshotIndexStatus> snapshotIndexStatusMap = snapshotsStatus.getIndices();
// Although the snapshot contains 3 indices, the response of status api call only contains results for 2
assertEquals(snapshotIndexStatusMap.size(), 2);
assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2));
});

// when a non-existent index is requested in the index-filter
assertBusy(() -> {
// failure due to index not found in snapshot
final String nonExistentIndex1 = "non-existent-index-1";
final String nonExistentIndex2 = "non-existent-index-2";
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2)
.execute()
.actionGet()
);
String cause = String.format(
Locale.ROOT,
"indices [%s] missing in snapshot [%s] of repository [%s]",
String.join(", ", List.of(nonExistentIndex2, nonExistentIndex1)),
inProgressSnapshot,
repositoryName
);
assertEquals(cause, ex.getCause().getMessage());

// no error for ignore_unavailable = true and status response contains only the found indices
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2, nonExistentIndex1, nonExistentIndex2)
.setIgnoreUnavailable(true)
.get()
.getSnapshots()
.get(0);

Map<String, SnapshotIndexStatus> snapshotIndexStatusMap = snapshotsStatus.getIndices();
assertEquals(snapshotIndexStatusMap.size(), 2);
assertEquals(snapshotIndexStatusMap.keySet(), Set.of(index1, index2));
});

logger.info("Unblock data node");
unblockAllDataNodes(repositoryName);

logger.info("Wait for snapshot to finish");
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));
}

public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
String repositoryName = "test-repo";
String index1 = "test-idx-1";
Expand All @@ -705,6 +796,39 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
createSnapshot(repositoryName, snapshot1, List.of(index1, index2, index3));
createSnapshot(repositoryName, snapshot2, List.of(index1));

assertBusy(() -> {
// failure due to passing index filter for _all value of repository param
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus("_all")
.setSnapshots(snapshot1)
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
String cause =
"index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]";
assertTrue(ex.getMessage().contains(cause));
});

assertBusy(() -> {
// failure due to passing index filter for _all value of snapshot param --> gets translated as a blank array
Exception ex = expectThrows(
Exception.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots()
.setIndices(index1, index2, index3)
.execute()
.actionGet()
);
String cause = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [_all]";
assertTrue(ex.getMessage().contains(cause));
});

assertBusy(() -> {
// failure due to passing index filter for multiple snapshots
ActionRequestValidationException ex = expectThrows(
Expand All @@ -717,9 +841,10 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.execute()
.actionGet()
);
String cause = "index list filter is supported only for a single snapshot";
String cause =
"index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = [[test-snap-1, test-snap-2]]";
assertTrue(ex.getMessage().contains(cause));
}, 1, TimeUnit.MINUTES);
});

assertBusy(() -> {
// failure due to index not found in snapshot
Expand All @@ -743,7 +868,18 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
);
assertEquals(cause, ex.getCause().getMessage());

}, 1, TimeUnit.MINUTES);
// no error for ignore_unavailable = true and status response contains only the found indices
SnapshotStatus snapshotsStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot2)
.setIndices(index1, index2, index3)
.setIgnoreUnavailable(true)
.get()
.getSnapshots()
.get(0);
assertEquals(1, snapshotsStatus.getIndices().size());
});

assertBusy(() -> {
// failure due to too many shards requested
Expand All @@ -763,12 +899,148 @@ public void testSnapshotStatusFailuresWithIndexFilter() throws Exception {
.actionGet()
);
assertEquals(ex.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(ex.getMessage().endsWith(" is more than the maximum allowed value of shard count [2] for snapshot status request"));
assertTrue(ex.getMessage().contains(" is more than the maximum allowed value of shard count [2] for snapshot status request"));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}, 2, TimeUnit.MINUTES);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
});
}

public void testSnapshotStatusShardLimitOfResponseForInProgressSnapshot() throws Exception {
logger.info("Create repository");
String repositoryName = "test-repo";
createRepository(
repositoryName,
"mock",
Settings.builder()
.put("location", randomRepoPath())
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("wait_after_unblock", 200)
);

logger.info("Create indices");
String index1 = "test-idx-1";
String index2 = "test-idx-2";
String index3 = "test-idx-3";
assertAcked(prepareCreate(index1, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
assertAcked(prepareCreate(index3, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
ensureGreen();

logger.info("Index some data");
indexRandomDocs(index1, 10);
indexRandomDocs(index2, 10);
indexRandomDocs(index3, 10);

logger.info("Create completed snapshot");
String completedSnapshot = "test-completed-snapshot";
String blockedNode = blockNodeWithIndex(repositoryName, index1);
client().admin().cluster().prepareCreateSnapshot(repositoryName, completedSnapshot).setWaitForCompletion(false).get();
waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60));
unblockNode(repositoryName, blockedNode);
waitForCompletion(repositoryName, completedSnapshot, TimeValue.timeValueSeconds(60));

logger.info("Index some more data");
indexRandomDocs(index1, 10);
indexRandomDocs(index2, 10);
indexRandomDocs(index3, 10);
refresh();

logger.info("Create in-progress snapshot");
String inProgressSnapshot = "test-in-progress-snapshot";
blockedNode = blockNodeWithIndex(repositoryName, index1);
client().admin().cluster().prepareCreateSnapshot(repositoryName, inProgressSnapshot).setWaitForCompletion(false).get();
waitForBlock(blockedNode, repositoryName, TimeValue.timeValueSeconds(60));
List<SnapshotStatus> snapshotStatuses = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot, completedSnapshot)
.get()
.getSnapshots();

assertEquals(2, snapshotStatuses.size());
assertEquals(SnapshotsInProgress.State.STARTED, snapshotStatuses.get(0).getState());
assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatuses.get(1).getState());

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 1));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// shard limit exceeded due to inProgress snapshot alone @ without index-filter
assertBusy(() -> {
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request")
);
});

// shard limit exceeded due to inProgress snapshot alone @ with index-filter
assertBusy(() -> {
CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.setIndices(index1, index2)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [1] for snapshot status request")
);
});

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a slightly higher value");
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 5));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// shard limit exceeded due to passing for inProgress but failing for current + completed

assertBusy(() -> {
SnapshotStatus inProgressSnapshotStatus = client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot)
.get()
.getSnapshots()
.get(0);
assertEquals(3, inProgressSnapshotStatus.getShards().size());

CircuitBreakingException exception = expectThrows(
CircuitBreakingException.class,
() -> client().admin()
.cluster()
.prepareSnapshotStatus(repositoryName)
.setSnapshots(inProgressSnapshot, completedSnapshot)
.execute()
.actionGet()
);
assertEquals(exception.status(), RestStatus.TOO_MANY_REQUESTS);
assertTrue(
exception.getMessage().contains(" is more than the maximum allowed value of shard count [5] for snapshot status request")
);
});

unblockNode(repositoryName, blockedNode);
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));

logger.info("Reset MAX_SHARDS_ALLOWED_IN_STATUS_API to default value");
updateSettingsRequest.persistentSettings(Settings.builder().putNull(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey()));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -124,8 +125,20 @@
if (snapshots == null) {
validationException = addValidationError("snapshots is null", validationException);
}
if (indices.length != 0 && snapshots.length != 1) {
validationException = addValidationError("index list filter is supported only for a single snapshot", validationException);
if (indices.length != 0) {
if (repository.equals("_all")) {
String error =

Check warning on line 130 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java#L130

Added line #L130 was not covered by tests
"index list filter is supported only when a single 'repository' is passed, but found 'repository' param = [_all]";
validationException = addValidationError(error, validationException);

Check warning on line 132 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java#L132

Added line #L132 was not covered by tests
}
if (snapshots.length != 1) {
// snapshot param was '_all' (length = 0) or a list of snapshots (length > 1)
String snapshotParamValue = snapshots.length == 0 ? "_all" : Arrays.toString(snapshots);
String error = "index list filter is supported only when a single 'snapshot' is passed, but found 'snapshot' param = ["

Check warning on line 137 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java#L137

Added line #L137 was not covered by tests
+ snapshotParamValue
+ "]";
validationException = addValidationError(error, validationException);

Check warning on line 140 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotsStatusRequest.java#L140

Added line #L140 was not covered by tests
}
}
return validationException;
}
Expand Down
Loading
Loading