From 4bdc5f233d3d9c76af85fcd3d79be65621176867 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Mon, 2 Sep 2024 17:18:17 +0530 Subject: [PATCH 1/4] Add primary store size to snapshot status result for shallow V2 snapshots Signed-off-by: Lakshya Taragi --- .../RemoteIndexSnapshotStatusApiIT.java | 2 +- .../snapshots/status/SnapshotStatus.java | 45 +++++++++++++--- .../TransportSnapshotsStatusAction.java | 8 +-- .../org/opensearch/cluster/ClusterInfo.java | 25 ++++++++- .../cluster/InternalClusterInfoService.java | 22 ++++++-- .../main/java/org/opensearch/node/Node.java | 1 + .../opensearch/snapshots/SnapshotInfo.java | 53 +++++++++++++++---- .../snapshots/SnapshotsService.java | 12 ++++- .../create/CreateSnapshotResponseTests.java | 3 +- .../get/GetSnapshotsResponseTests.java | 3 +- .../snapshots/status/SnapshotStatusTests.java | 4 +- .../opensearch/cluster/ClusterInfoTests.java | 3 +- .../allocation/DiskThresholdMonitorTests.java | 2 +- ...dexShardConstraintDeciderOverlapTests.java | 2 +- .../RemoteShardsBalancerBaseTestCase.java | 2 +- .../decider/DiskThresholdDeciderTests.java | 2 +- .../DiskThresholdDeciderUnitTests.java | 23 ++++++-- .../tiering/TieringRequestValidatorTests.java | 8 +-- .../BlobStoreRepositoryRestoreTests.java | 3 +- .../snapshots/SnapshotInfoTests.java | 33 ++++++++---- .../snapshots/SnapshotResiliencyTests.java | 13 ++--- ...ckEventuallyConsistentRepositoryTests.java | 9 ++-- .../MockInternalClusterInfoService.java | 3 +- .../AbstractSnapshotIntegTestCase.java | 3 +- 24 files changed, 217 insertions(+), 67 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java index e84de36df2fca..ec51977321c53 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java @@ -278,7 +278,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea if (hasIndexFilter) { assertEquals(0, snapshotStatus.getStats().getTotalSize()); } else { - // TODO: after adding primary store size at the snapshot level, total size here should be > 0 + assertTrue(snapshotStatus.getStats().getTotalSize() > 0L); } // assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels assertEquals(0, snapshotStatus.getStats().getTotalFileCount()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index e0f380b3ebbe6..e27c404c6e6bc 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.snapshots.status; +import org.opensearch.Version; import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.cluster.SnapshotsInProgress.State; import org.opensearch.common.Nullable; @@ -86,6 +87,8 @@ public class SnapshotStatus implements ToXContentObject, Writeable { private SnapshotStats stats; + private final long initialTotalSizeInBytes; + @Nullable private final Boolean includeGlobalState; @@ -96,7 +99,12 @@ public class SnapshotStatus implements ToXContentObject, Writeable { includeGlobalState = in.readOptionalBoolean(); final long startTime = in.readLong(); final long time = in.readLong(); - updateShardStats(startTime, time); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + initialTotalSizeInBytes = in.readOptionalLong(); + } else { + initialTotalSizeInBytes = 0L; + } + updateShardStats(startTime, time, initialTotalSizeInBytes); } SnapshotStatus( @@ -105,7 +113,8 @@ public class SnapshotStatus implements ToXContentObject, Writeable { List shards, Boolean includeGlobalState, long startTime, - long time + long time, + long initialTotalSizeInBytes ) { this.snapshot = Objects.requireNonNull(snapshot); this.state = Objects.requireNonNull(state); @@ -113,7 +122,9 @@ public class SnapshotStatus implements ToXContentObject, Writeable { this.includeGlobalState = includeGlobalState; shardsStats = new SnapshotShardsStats(shards); assert time >= 0 : "time must be >= 0 but received [" + time + "]"; - updateShardStats(startTime, time); + this.initialTotalSizeInBytes = initialTotalSizeInBytes; + assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]"; + updateShardStats(startTime, time, initialTotalSizeInBytes); } private SnapshotStatus( @@ -123,7 +134,8 @@ private SnapshotStatus( Map indicesStatus, SnapshotShardsStats shardsStats, SnapshotStats stats, - Boolean includeGlobalState + Boolean includeGlobalState, + long initialTotalSizeInBytes ) { this.snapshot = snapshot; this.state = state; @@ -132,6 +144,7 @@ private SnapshotStatus( this.shardsStats = shardsStats; this.stats = stats; this.includeGlobalState = includeGlobalState; + this.initialTotalSizeInBytes = initialTotalSizeInBytes; } /** @@ -204,6 +217,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(includeGlobalState); out.writeLong(stats.getStartTime()); out.writeLong(stats.getTime()); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalLong(initialTotalSizeInBytes); + } } @Override @@ -224,6 +240,7 @@ public SnapshotStats getStats() { private static final String STATE = "state"; private static final String INDICES = "indices"; private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + private static final String INITIAL_TOTAL_SIZE_IN_BYTES = "initial_total_size_in_bytes"; @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -235,6 +252,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (includeGlobalState != null) { builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } + if (initialTotalSizeInBytes != 0) { + builder.field(INITIAL_TOTAL_SIZE_IN_BYTES, initialTotalSizeInBytes); + } builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params); builder.field(SnapshotStats.Fields.STATS, stats, params); builder.startObject(INDICES); @@ -256,6 +276,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws String uuid = (String) parsedObjects[i++]; String rawState = (String) parsedObjects[i++]; Boolean includeGlobalState = (Boolean) parsedObjects[i++]; + Long initialTotalSizeInBytes = (Long) parsedObjects[i++]; SnapshotStats stats = ((SnapshotStats) parsedObjects[i++]); SnapshotShardsStats shardsStats = ((SnapshotShardsStats) parsedObjects[i++]); @SuppressWarnings("unchecked") @@ -276,7 +297,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws shards.addAll(index.getShards().values()); } } - return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState); + return new SnapshotStatus( + snapshot, + state, + shards, + indicesStatus, + shardsStats, + stats, + includeGlobalState, + initialTotalSizeInBytes + ); } ); static { @@ -285,6 +315,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws PARSER.declareString(constructorArg(), new ParseField(UUID)); PARSER.declareString(constructorArg(), new ParseField(STATE)); PARSER.declareBoolean(optionalConstructorArg(), new ParseField(INCLUDE_GLOBAL_STATE)); + PARSER.declareLong(optionalConstructorArg(), new ParseField(INITIAL_TOTAL_SIZE_IN_BYTES)); PARSER.declareField( constructorArg(), SnapshotStats::fromXContent, @@ -299,8 +330,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept return PARSER.parse(parser, null); } - private void updateShardStats(long startTime, long time) { - stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0); + private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) { + stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0); shardsStats = new SnapshotShardsStats(shards); for (SnapshotIndexShardStatus shard : shards) { // BWC: only update timestamps when we did not get a start time from an old node diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 8228cb6301c8c..bda6a9e876d99 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -298,7 +298,8 @@ private void buildResponse( Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(), - Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L) + Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L), + 0L ) ); } @@ -345,7 +346,7 @@ private void loadRepositoryData( boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0; long initialSnapshotTotalSize = 0; if (isShallowV2Snapshot && request.indices().length == 0) { - // TODO: add primary store size in bytes at the snapshot level + initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes(); } for (Map.Entry shardStatus : shardStatuses.entrySet()) { @@ -378,7 +379,8 @@ private void loadRepositoryData( snapshotInfo.includeGlobalState(), startTime, // Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0 - (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime + (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime, + initialSnapshotTotalSize ) ); } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java index 7216c447acc3e..595e5fd89cba6 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java @@ -69,11 +69,13 @@ public class ClusterInfo implements ToXContentFragment, Writeable { final Map routingToDataPath; final Map reservedSpace; final Map nodeFileCacheStats; + final long primaryStoreSize; + private long avgTotalBytes; private long avgFreeByte; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L); } /** @@ -84,6 +86,7 @@ protected ClusterInfo() { * @param shardSizes a shardkey to size in bytes mapping per shard. * @param routingToDataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path + * @param primaryStoreSize total size in bytes for all the primary shards * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -92,7 +95,8 @@ public ClusterInfo( final Map shardSizes, final Map routingToDataPath, final Map reservedSpace, - final Map nodeFileCacheStats + final Map nodeFileCacheStats, + final long primaryStoreSize ) { this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.shardSizes = shardSizes; @@ -100,6 +104,7 @@ public ClusterInfo( this.routingToDataPath = routingToDataPath; this.reservedSpace = reservedSpace; this.nodeFileCacheStats = nodeFileCacheStats; + this.primaryStoreSize = primaryStoreSize; calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage); } @@ -121,6 +126,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.nodeFileCacheStats = Map.of(); } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + this.primaryStoreSize = in.readOptionalLong(); + } else { + this.primaryStoreSize = 0L; + } calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage); } @@ -166,6 +176,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o)); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalLong(this.primaryStoreSize); + } } public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -220,6 +233,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } builder.endArray(); // end "reserved_sizes" + builder.field("primary_store_size", this.primaryStoreSize); return builder; } @@ -246,6 +260,13 @@ public Map getNodeFileCacheStats() { return Collections.unmodifiableMap(this.nodeFileCacheStats); } + /** + * Returns the total size in bytes for all the primary shards + */ + public long getPrimaryStoreSize() { + return primaryStoreSize; + } + /** * Returns the shard size for the given shard routing or null it that metric is not available. */ diff --git a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java index e381b8f244bf3..ded8ef8797c1b 100644 --- a/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java @@ -115,6 +115,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map nodeFileCacheStats; private volatile IndicesStatsSummary indicesStatsSummary; // null if this node is not currently the cluster-manager + + private volatile long primaryStoreSize; private final AtomicReference refreshAndRescheduleRunnable = new AtomicReference<>(); private volatile boolean enabled; private volatile TimeValue fetchTimeout; @@ -127,6 +129,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi this.mostAvailableSpaceUsages = Map.of(); this.nodeFileCacheStats = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; + this.primaryStoreSize = 0L; this.threadPool = threadPool; this.client = client; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); @@ -213,7 +216,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace, - nodeFileCacheStats + nodeFileCacheStats, + primaryStoreSize ); } @@ -305,8 +309,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { final Map shardSizeByIdentifierBuilder = new HashMap<>(); final Map dataPathByShardRoutingBuilder = new HashMap<>(); final Map reservedSpaceBuilders = new HashMap<>(); - buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders); - + primaryStoreSize = buildShardLevelInfo( + logger, + stats, + shardSizeByIdentifierBuilder, + dataPathByShardRoutingBuilder, + reservedSpaceBuilders + ); final Map rsrvdSpace = new HashMap<>(); reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build())); @@ -366,13 +375,14 @@ public void addListener(Consumer clusterInfoConsumer) { listeners.add(clusterInfoConsumer); } - static void buildShardLevelInfo( + static long buildShardLevelInfo( Logger logger, ShardStats[] stats, final Map shardSizes, final Map newShardRoutingToDataPath, final Map reservedSpaceByShard ) { + long currentPrimaryStoreSize = 0L; for (ShardStats s : stats) { final ShardRouting shardRouting = s.getShardRouting(); newShardRoutingToDataPath.put(shardRouting, s.getDataPath()); @@ -382,6 +392,9 @@ static void buildShardLevelInfo( continue; } final long size = storeStats.sizeInBytes(); + if (shardRouting.primary()) { + currentPrimaryStoreSize += size; + } final long reserved = storeStats.getReservedSize().getBytes(); final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); @@ -396,6 +409,7 @@ static void buildShardLevelInfo( reservedSpaceBuilder.add(shardRouting.shardId(), reserved); } } + return currentPrimaryStoreSize; } static void fillDiskUsagePerNode( diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a8d4ebcf23dab..8fdec0096a6e6 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1214,6 +1214,7 @@ protected Node( clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, + clusterInfoService, actionModule.getActionFilters(), remoteStorePinnedTimestampService, remoteStoreSettings diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 08433dc6f2e0b..4aa4589902cbe 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -101,6 +101,8 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String PINNED_TIMESTAMP = "pinned_timestamp"; + private static final String SNAPSHOT_SIZE_IN_BYTES = "snapshot_size_in_bytes"; + private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -125,6 +127,8 @@ public static final class SnapshotInfoBuilder { private Boolean remoteStoreIndexShallowCopy = null; private long pinnedTimestamp = 0L; + private long snapshotSizeInBytes = 0L; + private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -185,6 +189,10 @@ private void setPinnedTimestamp(long pinnedTimestamp) { this.pinnedTimestamp = pinnedTimestamp; } + private void setSnapshotSizeInBytes(long snapshotSizeInBytes) { + this.snapshotSizeInBytes = snapshotSizeInBytes; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -225,7 +233,8 @@ public SnapshotInfo build() { includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } } @@ -281,6 +290,7 @@ int getSuccessfulShards() { new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) ); SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP)); + SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setSnapshotSizeInBytes, new ParseField(SNAPSHOT_SIZE_IN_BYTES)); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -318,6 +328,9 @@ int getSuccessfulShards() { private Boolean remoteStoreIndexShallowCopy; private long pinnedTimestamp; + + private long snapshotSizeInBytes; + @Nullable private final Map userMetadata; @@ -327,11 +340,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0L); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0L); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -350,6 +363,7 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { entry.includeGlobalState(), entry.userMetadata(), entry.remoteStoreIndexShallowCopy(), + 0L, 0L ); } @@ -398,7 +412,8 @@ public SnapshotInfo( Boolean includeGlobalState, Map userMetadata, Boolean remoteStoreIndexShallowCopy, - long pinnedTimestamp + long pinnedTimestamp, + long snapshotSizeInBytes ) { this( snapshotId, @@ -415,7 +430,8 @@ public SnapshotInfo( includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } @@ -434,7 +450,8 @@ public SnapshotInfo( Boolean includeGlobalState, Map userMetadata, Boolean remoteStoreIndexShallowCopy, - long pinnedTimestamp + long pinnedTimestamp, + long snapshotSizeInBytes ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -451,6 +468,7 @@ public SnapshotInfo( this.userMetadata = userMetadata; this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; this.pinnedTimestamp = pinnedTimestamp; + this.snapshotSizeInBytes = snapshotSizeInBytes; } /** @@ -475,6 +493,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_2_17_0)) { pinnedTimestamp = in.readVLong(); + snapshotSizeInBytes = in.readVLong(); } } @@ -594,6 +613,10 @@ public long getPinnedTimestamp() { return pinnedTimestamp; } + public long getSnapshotSizeInBytes() { + return snapshotSizeInBytes; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -663,6 +686,8 @@ public String toString() { + remoteStoreIndexShallowCopy + ", pinnedTimestamp=" + pinnedTimestamp + + ", snapshotSizeInBytes=" + + snapshotSizeInBytes + '}'; } @@ -763,6 +788,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final if (pinnedTimestamp != 0) { builder.field(PINNED_TIMESTAMP, pinnedTimestamp); } + if (snapshotSizeInBytes != 0) { + builder.field(SNAPSHOT_SIZE_IN_BYTES, snapshotSizeInBytes); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -812,6 +840,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr int totalShards = 0; int successfulShards = 0; long pinnedTimestamp = 0; + long snapshotSizeInBytes = 0; Boolean includeGlobalState = null; Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; @@ -855,6 +884,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr remoteStoreIndexShallowCopy = parser.booleanValue(); } else if (PINNED_TIMESTAMP.equals(currentFieldName)) { pinnedTimestamp = parser.longValue(); + } else if (SNAPSHOT_SIZE_IN_BYTES.equals(currentFieldName)) { + snapshotSizeInBytes = parser.longValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -908,7 +939,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } @@ -942,6 +974,7 @@ public void writeTo(final StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_2_17_0)) { out.writeVLong(pinnedTimestamp); + out.writeVLong(snapshotSizeInBytes); } } @@ -976,7 +1009,8 @@ public boolean equals(Object o) { && Objects.equals(shardFailures, that.shardFailures) && Objects.equals(userMetadata, that.userMetadata) && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy) - && Objects.equals(pinnedTimestamp, that.pinnedTimestamp); + && Objects.equals(pinnedTimestamp, that.pinnedTimestamp) + && Objects.equals(snapshotSizeInBytes, that.snapshotSizeInBytes); } @Override @@ -997,7 +1031,8 @@ public int hashCode() { shardFailures, userMetadata, remoteStoreIndexShallowCopy, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index f6e550525a3e5..ec049807fa42b 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -46,6 +46,7 @@ import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -162,6 +163,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final RepositoriesService repositoriesService; + private final ClusterInfoService clusterInfoService; + private final RemoteStoreLockManagerFactory remoteStoreLockManagerFactory; private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @@ -229,6 +232,7 @@ public SnapshotsService( IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, TransportService transportService, + ClusterInfoService clusterInfoService, ActionFilters actionFilters, @Nullable RemoteStorePinnedTimestampService remoteStorePinnedTimestampService, RemoteStoreSettings remoteStoreSettings @@ -247,6 +251,7 @@ public SnapshotsService( remoteStoreSettings.getSegmentsPathFixedPrefix() ); this.transportService = transportService; + this.clusterInfoService = clusterInfoService; this.remoteStorePinnedTimestampService = remoteStorePinnedTimestampService; // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. @@ -466,6 +471,7 @@ public TimeValue timeout() { */ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { long pinnedTimestamp = System.currentTimeMillis(); + long snapshotSizeInBytes = clusterInfoService.getClusterInfo().getPrimaryStoreSize(); final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); @@ -532,7 +538,8 @@ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionLi request.includeGlobalState(), userMeta, true, - pinnedTimestamp + pinnedTimestamp, + snapshotSizeInBytes ); if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager"); @@ -1956,7 +1963,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met entry.includeGlobalState(), entry.userMetadata(), entry.remoteStoreIndexShallowCopy(), - 0 + 0, + 0L ); final StepListener metadataListener = new StepListener<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 2feb0d3ba9405..9aa12caf2ea6e 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -96,7 +96,8 @@ protected CreateSnapshotResponse createTestInstance() { globalState, SnapshotInfoTests.randomUserMetadata(), false, - 0 + 0, + 0L ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 58af390d194d3..2219a65858cb3 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -78,7 +78,8 @@ protected GetSnapshotsResponse createTestInstance() { randomBoolean(), SnapshotInfoTests.randomUserMetadata(), false, - 0 + 0, + 0L ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java index 3918e5d9b235c..372dfb5c49528 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java @@ -61,7 +61,7 @@ public void testToString() throws Exception { List snapshotIndexShardStatuses = new ArrayList<>(); snapshotIndexShardStatuses.add(snapshotIndexShardStatus); boolean includeGlobalState = randomBoolean(); - SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L); + SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L, 0L); int initializingShards = 0; int startedShards = 0; @@ -213,7 +213,7 @@ protected SnapshotStatus createTestInstance() { snapshotIndexShardStatuses.add(snapshotIndexShardStatus); } boolean includeGlobalState = randomBoolean(); - return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L); + return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L, 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java index 4ec7db2f3d552..f652c65a20a8d 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterInfoTests.java @@ -51,7 +51,8 @@ public void testSerialization() throws Exception { randomShardSizes(), randomRoutingToDataPath(), randomReservedSpace(), - randomFileCacheStats() + randomFileCacheStats(), + randomLong() ); BytesStreamOutput output = new BytesStreamOutput(); clusterInfo.writeTo(output); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 6ab57d10b05c1..298a04eccc064 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -902,7 +902,7 @@ private static ClusterInfo clusterInfo( final Map diskUsages, final Map reservedSpace ) { - return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of()); + return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of(), 0L); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java index 7f2f048485318..0d9f6e63e31d7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexShardConstraintDeciderOverlapTests.java @@ -176,7 +176,7 @@ public DevNullClusterInfo( final Map shardSizes, final Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of(), 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 6a03a1f79bcde..6b3aa0ed1a32a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -277,7 +277,7 @@ public DevNullClusterInfo( final Map mostAvailableSpaceUsage, final Map shardSizes ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of()); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of(), 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 94e91c3f7c3c1..4db9851f6150c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1653,7 +1653,7 @@ static class DevNullClusterInfo extends ClusterInfo { Map reservedSpace, final Map nodeFileCacheStats ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, nodeFileCacheStats, 0L); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 622d3a2c920b5..f5c300460db3d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -127,7 +127,15 @@ public void testCanAllocateUsesMaxAvailableSpace() { final Map shardSizes = new HashMap<>(); shardSizes.put("[test][0][p]", 10L); // 10 bytes - final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); + final ClusterInfo clusterInfo = new ClusterInfo( + leastAvailableUsages, + mostAvailableUsage, + shardSizes, + Map.of(), + Map.of(), + Map.of(), + 10L + ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -203,7 +211,15 @@ public void testCannotAllocateDueToLackOfDiskResources() { // way bigger than available space final long shardSize = randomIntBetween(110, 1000); shardSizes.put("[test][0][p]", shardSize); - ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo( + leastAvailableUsages, + mostAvailableUsage, + shardSizes, + Map.of(), + Map.of(), + Map.of(), + shardSize + ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), @@ -326,7 +342,8 @@ public void testCanRemainUsesLeastAvailableSpace() { shardSizes, shardRoutingMap, Map.of(), - Map.of() + Map.of(), + 30L ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), diff --git a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java index 6b6f74353812b..fc07a1a4b783e 100644 --- a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java +++ b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java @@ -171,7 +171,7 @@ public void testGetTotalIndexSize() { Map diskUsages = diskUsages(1, 100, 50); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 10L); // 10 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 10L); assertEquals(10, getIndexPrimaryStoreSize(clusterState, clusterInfo, indexName)); } @@ -185,7 +185,7 @@ public void testValidateEligibleNodesCapacityWithAllAccepted() { Map diskUsages = diskUsages(1, 100, 50); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 10L); // 10 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 10L); TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); assertEquals(indices, tieringValidationResult.getAcceptedIndices()); @@ -202,7 +202,7 @@ public void testValidateEligibleNodesCapacityWithAllRejected() { Map diskUsages = diskUsages(1, 100, 10); final Map shardSizes = new HashMap<>(); shardSizes.put("[test_index][0][p]", 20L); // 20 bytes - ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of(), 20L); TieringValidationResult tieringValidationResult = new TieringValidationResult(indices); validateEligibleNodesCapacity(clusterInfo, clusterState, tieringValidationResult); assertEquals(indices.size(), tieringValidationResult.getRejectedIndices().size()); @@ -305,7 +305,7 @@ private static DiskThresholdSettings diskThresholdSettings(String low, String hi private static ClusterInfo clusterInfo(int noOfNodes, long totalBytes, long freeBytes) { final Map diskUsages = diskUsages(noOfNodes, totalBytes, freeBytes); - return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of()); + return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of(), 0L); } private static Map diskUsages(int noOfSearchNodes, long totalBytes, long freeBytes) { diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 7fc987dcfa9bb..b64fefa2c40f5 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -215,7 +215,8 @@ public void testSnapshotWithConflictingName() throws Exception { true, Collections.emptyMap(), false, - 0 + 0, + 0L ), Version.CURRENT, Function.identity(), diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 684a8dd36fccc..0355255213743 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -87,7 +87,8 @@ protected SnapshotInfo createTestInstance() { includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - 0 + 0, + 0L ); } @@ -116,7 +117,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); @@ -135,7 +137,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 2: return new SnapshotInfo( @@ -150,7 +153,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 3: return new SnapshotInfo( @@ -165,7 +169,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 4: return new SnapshotInfo( @@ -180,7 +185,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); @@ -207,7 +213,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 6: return new SnapshotInfo( @@ -222,7 +229,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { Boolean.FALSE.equals(instance.includeGlobalState()), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 7: return new SnapshotInfo( @@ -237,7 +245,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), instance.isRemoteStoreIndexShallowCopyEnabled(), - 0 + 0, + 0L ); case 8: List dataStreams = randomValueOtherThan( @@ -256,7 +265,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), instance.isRemoteStoreIndexShallowCopyEnabled(), - 123456 + 123456, + 123456L ); case 9: return new SnapshotInfo( @@ -271,7 +281,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.includeGlobalState(), instance.userMetadata(), Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()), - 123456 + 123456, + 123456L ); default: throw new IllegalArgumentException("invalid randomization case"); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 350c6f9ae8f6b..c6c3016bf6c46 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -456,7 +456,7 @@ public void testSearchableSnapshotOverSubscription() { for (TestClusterNodes.TestClusterNode node : testClusterNodes.nodes.values()) { nodeFileCacheStats.put(node.node.getId(), new FileCacheStats(0, 1, 0, 0, 0, 0, 0)); } - ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats); + ClusterInfo clusterInfo = new ClusterInfo(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), nodeFileCacheStats, 0L); testClusterNodes.nodes.values().forEach(node -> when(node.getMockClusterInfoService().getClusterInfo()).thenReturn(clusterInfo)); final StepListener createSnapshotResponseListener = new StepListener<>(); @@ -2006,21 +2006,22 @@ public void onFailure(final Exception e) { ); remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, threadPool); final ActionFilters actionFilters = new ActionFilters(emptySet()); + nodeEnv = new NodeEnvironment(settings, environment); + final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); + final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); + client = new NodeClient(settings, threadPool); + clusterInfoService = Mockito.mock(ClusterInfoService.class); snapshotsService = new SnapshotsService( settings, clusterService, indexNameExpressionResolver, repositoriesService, transportService, + clusterInfoService, actionFilters, null, DefaultRemoteStoreSettings.INSTANCE ); - nodeEnv = new NodeEnvironment(settings, environment); - final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); - final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); - client = new NodeClient(settings, threadPool); - clusterInfoService = Mockito.mock(ClusterInfoService.class); final SetOnce rerouteServiceSetOnce = new SetOnce<>(); final SnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService( settings, diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 06a486b3cb997..7a6f865afb42a 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -235,7 +235,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, - 0 + 0, + 0L ), Version.CURRENT, Function.identity(), @@ -263,7 +264,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, - 0 + 0, + 0L ), Version.CURRENT, Function.identity(), @@ -293,7 +295,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { true, Collections.emptyMap(), false, - 0 + 0, + 0L ), Version.CURRENT, Function.identity(), diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 35ca5d80aeb4e..40f3f864efd60 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -138,7 +138,8 @@ class SizeFakingClusterInfo extends ClusterInfo { delegate.shardSizes, delegate.routingToDataPath, delegate.reservedSpace, - delegate.nodeFileCacheStats + delegate.nodeFileCacheStats, + delegate.getPrimaryStoreSize() ); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 0bfa70a771f65..9bd21954bfeaa 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -606,7 +606,8 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget( f -> repo.finalizeSnapshot( From ececc75f6ecb7ab7b88db5f49868a9646168f900 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Tue, 3 Sep 2024 13:11:36 +0530 Subject: [PATCH 2/4] Fix build Signed-off-by: Lakshya Taragi --- .../snapshots/SnapshotStatusApisIT.java | 27 ++++++++++++------- .../snapshots/status/SnapshotStatus.java | 17 +----------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index c3214022df663..9e859fdb67009 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -116,7 +116,7 @@ public void testStatusApiConsistency() { assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime()); } - public void testStatusAPICallForShallowCopySnapshot() { + public void testStatusAPICallForShallowCopySnapshot() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used for the test"); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); @@ -136,15 +136,24 @@ public void testStatusAPICallForShallowCopySnapshot() { final String snapshot = "snapshot"; createFullSnapshot(snapshotRepoName, snapshot); - final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot); - assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); + assertBusy(() -> { + final SnapshotStatus snapshotStatus = client().admin() + .cluster() + .prepareSnapshotStatus(snapshotRepoName) + .setSnapshots(snapshot) + .execute() + .actionGet() + .getSnapshots() + .get(0); + assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); - final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); - assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); - assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); - assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); + assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); + assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); + assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); + }, 1, TimeUnit.MINUTES); } public void testStatusAPICallInProgressSnapshot() throws Exception { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index e27c404c6e6bc..dbd7cc244c37b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -240,7 +240,6 @@ public SnapshotStats getStats() { private static final String STATE = "state"; private static final String INDICES = "indices"; private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; - private static final String INITIAL_TOTAL_SIZE_IN_BYTES = "initial_total_size_in_bytes"; @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -252,9 +251,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (includeGlobalState != null) { builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState); } - if (initialTotalSizeInBytes != 0) { - builder.field(INITIAL_TOTAL_SIZE_IN_BYTES, initialTotalSizeInBytes); - } builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params); builder.field(SnapshotStats.Fields.STATS, stats, params); builder.startObject(INDICES); @@ -276,7 +272,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws String uuid = (String) parsedObjects[i++]; String rawState = (String) parsedObjects[i++]; Boolean includeGlobalState = (Boolean) parsedObjects[i++]; - Long initialTotalSizeInBytes = (Long) parsedObjects[i++]; SnapshotStats stats = ((SnapshotStats) parsedObjects[i++]); SnapshotShardsStats shardsStats = ((SnapshotShardsStats) parsedObjects[i++]); @SuppressWarnings("unchecked") @@ -297,16 +292,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws shards.addAll(index.getShards().values()); } } - return new SnapshotStatus( - snapshot, - state, - shards, - indicesStatus, - shardsStats, - stats, - includeGlobalState, - initialTotalSizeInBytes - ); + return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState, 0L); } ); static { @@ -315,7 +301,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws PARSER.declareString(constructorArg(), new ParseField(UUID)); PARSER.declareString(constructorArg(), new ParseField(STATE)); PARSER.declareBoolean(optionalConstructorArg(), new ParseField(INCLUDE_GLOBAL_STATE)); - PARSER.declareLong(optionalConstructorArg(), new ParseField(INITIAL_TOTAL_SIZE_IN_BYTES)); PARSER.declareField( constructorArg(), SnapshotStats::fromXContent, From 77ccc5d7ef643d859ec7f45141d2cf340527059c Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Wed, 4 Sep 2024 17:40:37 +0530 Subject: [PATCH 3/4] Add snapshot_size_in_bytes field to XContent of SnapshotInfo Signed-off-by: Lakshya Taragi --- .../org/opensearch/cluster/ClusterInfo.java | 29 ++++++++++++ .../opensearch/snapshots/SnapshotInfo.java | 44 +++++++++++++++++-- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java index 595e5fd89cba6..200dd53b9fe5a 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java @@ -78,6 +78,35 @@ protected ClusterInfo() { this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L); } + /** + * Creates a new ClusterInfo instance. + * + * @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node. + * @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node. + * @param shardSizes a shardkey to size in bytes mapping per shard. + * @param routingToDataPath the shard routing to datapath mapping + * @param reservedSpace reserved space per shard broken down by node and data path + * @see #shardIdentifierFromRouting + */ + public ClusterInfo( + final Map leastAvailableSpaceUsage, + final Map mostAvailableSpaceUsage, + final Map shardSizes, + final Map routingToDataPath, + final Map reservedSpace, + final Map nodeFileCacheStats + ) { + this( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + routingToDataPath, + reservedSpace, + nodeFileCacheStats, + 0L + ); + } + /** * Creates a new ClusterInfo instance. * diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 4aa4589902cbe..6d34db35c8bc1 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -340,11 +340,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0L); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0L, 0L); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0, 0L); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0L, 0L); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -396,7 +396,42 @@ public SnapshotInfo( includeGlobalState, userMetadata, remoteStoreIndexShallowCopy, - 0 + 0L, + 0L + ); + } + + public SnapshotInfo( + SnapshotId snapshotId, + List indices, + List dataStreams, + long startTime, + String reason, + long endTime, + int totalShards, + List shardFailures, + Boolean includeGlobalState, + Map userMetadata, + Boolean remoteStoreIndexShallowCopy, + long pinnedTimestamp + ) { + this( + snapshotId, + indices, + dataStreams, + snapshotState(reason, shardFailures), + reason, + Version.CURRENT, + startTime, + endTime, + totalShards, + totalShards - shardFailures.size(), + shardFailures, + includeGlobalState, + userMetadata, + remoteStoreIndexShallowCopy, + pinnedTimestamp, + 0L ); } @@ -726,6 +761,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (pinnedTimestamp != 0) { builder.field(PINNED_TIMESTAMP, pinnedTimestamp); } + if (snapshotSizeInBytes != 0) { + builder.field(SNAPSHOT_SIZE_IN_BYTES, snapshotSizeInBytes); + } builder.startArray(INDICES); for (String index : indices) { From cefa0bc55b1fbe61ef6dd2b5ab3fe0b5e7a68a7a Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Mon, 23 Sep 2024 15:45:17 +0530 Subject: [PATCH 4/4] Add version change todos and fix potential breaking changes Signed-off-by: Lakshya Taragi --- .../snapshots/SnapshotStatusApisIT.java | 27 +++++++------------ .../snapshots/status/SnapshotStatus.java | 13 +++++++++ .../org/opensearch/cluster/ClusterInfo.java | 12 +++------ .../opensearch/snapshots/SnapshotInfo.java | 6 +++++ 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java index 9e859fdb67009..c3214022df663 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java @@ -116,7 +116,7 @@ public void testStatusApiConsistency() { assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime()); } - public void testStatusAPICallForShallowCopySnapshot() throws Exception { + public void testStatusAPICallForShallowCopySnapshot() { disableRepoConsistencyCheck("Remote store repository is being used for the test"); internalCluster().startClusterManagerOnlyNode(); internalCluster().startDataOnlyNode(); @@ -136,24 +136,15 @@ public void testStatusAPICallForShallowCopySnapshot() throws Exception { final String snapshot = "snapshot"; createFullSnapshot(snapshotRepoName, snapshot); - assertBusy(() -> { - final SnapshotStatus snapshotStatus = client().admin() - .cluster() - .prepareSnapshotStatus(snapshotRepoName) - .setSnapshots(snapshot) - .execute() - .actionGet() - .getSnapshots() - .get(0); - assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotStatus snapshotStatus = getSnapshotStatus(snapshotRepoName, snapshot); + assertThat(snapshotStatus.getState(), is(SnapshotsInProgress.State.SUCCESS)); - final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); - assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); - assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); - assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); - assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); - }, 1, TimeUnit.MINUTES); + final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatus, indexName); + assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE)); + assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L)); + assertThat(snapshotShardState.getStats().getIncrementalFileCount(), greaterThan(0)); + assertThat(snapshotShardState.getStats().getIncrementalSize(), greaterThan(0L)); } public void testStatusAPICallInProgressSnapshot() throws Exception { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index dbd7cc244c37b..77aa72025a0a2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -99,6 +99,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable { includeGlobalState = in.readOptionalBoolean(); final long startTime = in.readLong(); final long time = in.readLong(); + // TODO: change version to 2_18_0 if (in.getVersion().onOrAfter(Version.CURRENT)) { initialTotalSizeInBytes = in.readOptionalLong(); } else { @@ -107,6 +108,17 @@ public class SnapshotStatus implements ToXContentObject, Writeable { updateShardStats(startTime, time, initialTotalSizeInBytes); } + SnapshotStatus( + Snapshot snapshot, + State state, + List shards, + Boolean includeGlobalState, + long startTime, + long time + ) { + this(snapshot, state, shards, includeGlobalState, startTime, time, 0L); + } + SnapshotStatus( Snapshot snapshot, State state, @@ -217,6 +229,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(includeGlobalState); out.writeLong(stats.getStartTime()); out.writeLong(stats.getTime()); + // TODO: change version to 2_18_0 if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeOptionalLong(initialTotalSizeInBytes); } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java index 200dd53b9fe5a..8437446b0265f 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterInfo.java @@ -96,15 +96,7 @@ public ClusterInfo( final Map reservedSpace, final Map nodeFileCacheStats ) { - this( - leastAvailableSpaceUsage, - mostAvailableSpaceUsage, - shardSizes, - routingToDataPath, - reservedSpace, - nodeFileCacheStats, - 0L - ); + this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, routingToDataPath, reservedSpace, nodeFileCacheStats, 0L); } /** @@ -155,6 +147,7 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.nodeFileCacheStats = Map.of(); } + // TODO: change version to 2_18_0 if (in.getVersion().onOrAfter(Version.CURRENT)) { this.primaryStoreSize = in.readOptionalLong(); } else { @@ -205,6 +198,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o)); } + // TODO: change version to 2_18_0 if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeOptionalLong(this.primaryStoreSize); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 6d34db35c8bc1..d300e17ccdb78 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -528,6 +528,9 @@ public SnapshotInfo(final StreamInput in) throws IOException { } if (in.getVersion().onOrAfter(Version.V_2_17_0)) { pinnedTimestamp = in.readVLong(); + } + // TODO: change version to 2_18_0 + if (in.getVersion().onOrAfter(Version.CURRENT)) { snapshotSizeInBytes = in.readVLong(); } } @@ -1012,6 +1015,9 @@ public void writeTo(final StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(Version.V_2_17_0)) { out.writeVLong(pinnedTimestamp); + } + // TODO: change version to 2_18_0 + if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeVLong(snapshotSizeInBytes); } }