Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Add snapshot size to status response for shallow V2 snapshots #15573

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
assertTrue(snapshotStatus.getStats().getTotalSize() > 0L);
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.snapshots.status;

import org.opensearch.Version;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -86,6 +87,8 @@

private SnapshotStats stats;

private final long initialTotalSizeInBytes;

@Nullable
private final Boolean includeGlobalState;

Expand All @@ -96,7 +99,13 @@
includeGlobalState = in.readOptionalBoolean();
final long startTime = in.readLong();
final long time = in.readLong();
updateShardStats(startTime, time);
// TODO: change version to 2_18_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While checking this in 'main', please could you help add a TODO comment with the specific version this needs to be replaced with in the follow-up PR? Applicable for all places we have added 'Version.CURRENT'

initialTotalSizeInBytes = in.readOptionalLong();

Check warning on line 104 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L104

Added line #L104 was not covered by tests
} else {
initialTotalSizeInBytes = 0L;

Check warning on line 106 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L106

Added line #L106 was not covered by tests
}
updateShardStats(startTime, time, initialTotalSizeInBytes);

Check warning on line 108 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L108

Added line #L108 was not covered by tests
}

SnapshotStatus(
Expand All @@ -106,14 +115,28 @@
Boolean includeGlobalState,
long startTime,
long time
) {
this(snapshot, state, shards, includeGlobalState, startTime, time, 0L);
}

Check warning on line 120 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L119-L120

Added lines #L119 - L120 were not covered by tests

SnapshotStatus(
Snapshot snapshot,
State state,
List<SnapshotIndexShardStatus> shards,
Boolean includeGlobalState,
long startTime,
long time,
long initialTotalSizeInBytes
) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
updateShardStats(startTime, time);
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]";
updateShardStats(startTime, time, initialTotalSizeInBytes);
}

private SnapshotStatus(
Expand All @@ -123,7 +146,8 @@
Map<String, SnapshotIndexStatus> indicesStatus,
SnapshotShardsStats shardsStats,
SnapshotStats stats,
Boolean includeGlobalState
Boolean includeGlobalState,
long initialTotalSizeInBytes
) {
this.snapshot = snapshot;
this.state = state;
Expand All @@ -132,6 +156,7 @@
this.shardsStats = shardsStats;
this.stats = stats;
this.includeGlobalState = includeGlobalState;
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
}

/**
Expand Down Expand Up @@ -204,6 +229,10 @@
out.writeOptionalBoolean(includeGlobalState);
out.writeLong(stats.getStartTime());
out.writeLong(stats.getTime());
// TODO: change version to 2_18_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(initialTotalSizeInBytes);

Check warning on line 234 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java#L234

Added line #L234 was not covered by tests
}
}

@Override
Expand Down Expand Up @@ -276,7 +305,7 @@
shards.addAll(index.getShards().values());
}
}
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState);
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState, 0L);
}
);
static {
Expand All @@ -299,8 +328,8 @@
return PARSER.parse(parser, null);
}

private void updateShardStats(long startTime, long time) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) {
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0);
shardsStats = new SnapshotShardsStats(shards);
for (SnapshotIndexShardStatus shard : shards) {
// BWC: only update timestamps when we did not get a start time from an old node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@
Collections.unmodifiableList(shardStatusBuilder),
entry.includeGlobalState(),
entry.startTime(),
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L),

Check warning on line 301 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java#L301

Added line #L301 was not covered by tests
0L
)
);
}
Expand Down Expand Up @@ -345,7 +346,7 @@
boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0;
long initialSnapshotTotalSize = 0;
if (isShallowV2Snapshot && request.indices().length == 0) {
// TODO: add primary store size in bytes at the snapshot level
initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes();

Check warning on line 349 in server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java#L349

Added line #L349 was not covered by tests
}

for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
Expand Down Expand Up @@ -378,7 +379,8 @@
snapshotInfo.includeGlobalState(),
startTime,
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime,
initialSnapshotTotalSize
)
);
}
Expand Down
46 changes: 45 additions & 1 deletion server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
final long primaryStoreSize;

private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L);
}

/**
Expand All @@ -93,13 +95,37 @@
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, routingToDataPath, reservedSpace, nodeFileCacheStats, 0L);
}

Check warning on line 100 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L99-L100

Added lines #L99 - L100 were not covered by tests

/**
* Creates a new ClusterInfo instance.
*
* @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node.
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param primaryStoreSize total size in bytes for all the primary shards
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
final Map<String, DiskUsage> leastAvailableSpaceUsage,
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats,
final long primaryStoreSize
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
this.primaryStoreSize = primaryStoreSize;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

Expand All @@ -121,6 +147,12 @@
} else {
this.nodeFileCacheStats = Map.of();
}
// TODO: change version to 2_18_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.primaryStoreSize = in.readOptionalLong();
} else {
this.primaryStoreSize = 0L;

Check warning on line 154 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L154

Added line #L154 was not covered by tests
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}
Expand Down Expand Up @@ -166,6 +198,10 @@
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
// TODO: change version to 2_18_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalLong(this.primaryStoreSize);
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -220,6 +256,7 @@
}
}
builder.endArray(); // end "reserved_sizes"
builder.field("primary_store_size", this.primaryStoreSize);

Check warning on line 259 in server/src/main/java/org/opensearch/cluster/ClusterInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/ClusterInfo.java#L259

Added line #L259 was not covered by tests
return builder;
}

Expand All @@ -246,6 +283,13 @@
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the total size in bytes for all the primary shards
*/
public long getPrimaryStoreSize() {
return primaryStoreSize;
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager

private volatile long primaryStoreSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already have final Map<String, Long> shardSizes; , which store shard level sizes. Can we just use this to retrieve shard sizes for v2 ?

private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
Expand All @@ -127,6 +129,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.primaryStoreSize = 0L;
this.threadPool = threadPool;
this.client = client;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -213,7 +216,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
nodeFileCacheStats,
primaryStoreSize
);
}

Expand Down Expand Up @@ -305,8 +309,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
final Map<ShardRouting, String> dataPathByShardRoutingBuilder = new HashMap<>();
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<>();
buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);

primaryStoreSize = buildShardLevelInfo(
logger,
stats,
shardSizeByIdentifierBuilder,
dataPathByShardRoutingBuilder,
reservedSpaceBuilders
);
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> rsrvdSpace = new HashMap<>();
reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));

Expand Down Expand Up @@ -366,13 +375,14 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(
static long buildShardLevelInfo(
Logger logger,
ShardStats[] stats,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> newShardRoutingToDataPath,
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard
) {
long currentPrimaryStoreSize = 0L;
for (ShardStats s : stats) {
final ShardRouting shardRouting = s.getShardRouting();
newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
Expand All @@ -382,6 +392,9 @@ static void buildShardLevelInfo(
continue;
}
final long size = storeStats.sizeInBytes();
if (shardRouting.primary()) {
currentPrimaryStoreSize += size;
}
final long reserved = storeStats.getReservedSize().getBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
Expand All @@ -396,6 +409,7 @@ static void buildShardLevelInfo(
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
}
}
return currentPrimaryStoreSize;
}

static void fillDiskUsagePerNode(
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ protected Node(
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
clusterInfoService,
actionModule.getActionFilters(),
remoteStorePinnedTimestampService,
remoteStoreSettings
Expand Down
Loading
Loading