Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate local recovery with remote store seeding during migration #12922

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
String indexName = "idx1";
int numOfNodes = randomIntBetween(6, 9);

/**
* Tests local recovery sanity in the happy path flow
*/
public void testLocalRecoveryRollingRestart() throws Exception {
triggerRollingRestartForRemoteMigration(0);
internalCluster().stopAllNodes();
}

/**
* Tests local recovery sanity during remote migration with a node restart in between
*/
public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
triggerRollingRestartForRemoteMigration(0);

DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
DiscoveryNode nodeToRestart = (DiscoveryNode) discoveryNodes.getDataNodes().values().toArray()[randomIntBetween(0, numOfNodes - 4)];
internalCluster().restartNode(nodeToRestart.getName());

Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
for (Map.Entry<ShardRouting, ShardStats> entry : shardStatsMap.entrySet()) {
ShardRouting shardRouting = entry.getKey();
ShardStats shardStats = entry.getValue();
if (nodeToRestart.equals(shardRouting.currentNodeId())) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
.filter(path -> path.getFileName().toString().contains("segments_"))
.map(path -> path.getFileName().toString())
.collect(Collectors.toList());
Set<String> expectedUniqueSegmentsNFiles = segmentsNFilesInRepo.stream()
.map(fileName -> fileName.split(SEGMENT_NAME_UUID_SEPARATOR)[0])
.collect(Collectors.toSet());
assertEquals(
"Expected no duplicate segments_N files in remote but duplicates were found " + segmentsNFilesInRepo,
expectedUniqueSegmentsNFiles.size(),
segmentsNFilesInRepo.size()
);
}, 90, TimeUnit.SECONDS);
}

internalCluster().stopAllNodes();
}

/**
* Tests local recovery flow sanity in the happy path flow with replicas in place
*/
public void testLocalRecoveryFlowWithReplicas() throws Exception {
triggerRollingRestartForRemoteMigration(randomIntBetween(1, 2));
internalCluster().stopAllNodes();
}

/**
* Helper method to run a rolling restart for migration to remote backed cluster
*/
private void triggerRollingRestartForRemoteMigration(int replicaCount) throws Exception {
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startNodes(numOfNodes - 3);

// create index
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
.build();
createIndex(indexName, indexSettings);
ensureGreen(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));

initDocRepToRemoteMigration();

// rolling restart
final Settings remoteNodeAttributes = remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath
);
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
// Update remote attributes
@Override
public Settings onNodeStopped(String nodeName) {
return remoteNodeAttributes;
}
});
ensureStableCluster(numOfNodes);
ensureGreen(TimeValue.timeValueSeconds(90), indexName);
assertEquals(internalCluster().size(), numOfNodes);

BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
// Assert on remote uploads
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
shardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}
});

// Assert on new remote uploads after seeding
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
Map<ShardRouting, ShardStats> newShardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
newShardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats prevRemoteSegmentStats = shardStatsMap.get(shardRouting)
.getStats()
.getSegments()
.getRemoteSegmentStats();
RemoteSegmentStats newRemoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(newRemoteSegmentStats.getTotalUploadTime() > prevRemoteSegmentStats.getTotalUploadTime());
assertTrue(newRemoteSegmentStats.getUploadBytesSucceeded() > prevRemoteSegmentStats.getUploadBytesSucceeded());

RemoteTranslogStats prevRemoteTranslogStats = shardStatsMap.get(shardRouting)
.getStats()
.getTranslog()
.getRemoteTranslogStats();
RemoteTranslogStats newRemoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
assertTrue(newRemoteTranslogStats.getUploadBytesSucceeded() > prevRemoteTranslogStats.getUploadBytesSucceeded());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,14 @@ public synchronized IndexShard createShard(
if (this.indexSettings.isRemoteStoreEnabled()) {
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
} else {
if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
if (sourceNode == null || sourceNode.isRemoteStoreNode() == false) {
if (routing.primary() == false) {
throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
}
logger.info("DocRep shard {} is migrating to remote", shardId);
seedRemote = true;
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
}

remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
this.indexSettings.getUUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,12 +1106,11 @@ private ReplicationGroup calculateReplicationGroup() {
} else {
newVersion = replicationGroup.getVersion() + 1;
}
assert indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| (replicationGroup != null
&& replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())))
assert newVersion == 0 || indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
: "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down Expand Up @@ -5000,7 +5000,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger
logger,
shouldSeedRemoteStore()
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
if (indexShard.shouldSeedRemoteStore()) {
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId());
indexShard.refresh("remote store migration");
});
indexShard.waitForRemoteStoreSync();
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId());

Check warning on line 659 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L654-L659

Added lines #L654 - L659 were not covered by tests
BhumikaSaini-Amazon marked this conversation as resolved.
Show resolved Hide resolved
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
download(translogTransferManager, location, logger, config.shouldSeedRemote());
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down Expand Up @@ -168,7 +168,8 @@
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger
Logger logger,
boolean seedRemote
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
Expand All @@ -189,11 +190,12 @@
pathStrategy,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Expand All @@ -206,7 +208,7 @@
boolean success = false;
long startTimeMs = System.currentTimeMillis();
try {
downloadOnce(translogTransferManager, location, logger);
downloadOnce(translogTransferManager, location, logger, seedRemote);
success = true;
return;
} catch (FileNotFoundException | NoSuchFileException e) {
Expand All @@ -220,7 +222,8 @@
throw ex;
}

private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down Expand Up @@ -262,7 +265,9 @@
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
final Checkpoint checkpoint = readCheckpoint(location);
if (isEmptyTranslog(checkpoint) == false) {
if (seedRemote) {
logger.debug("Remote migration ongoing. Retaining the translog on local, skipping clean-up");

Check warning on line 269 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L269

Added line #L269 was not covered by tests
} else if (isEmptyTranslog(checkpoint) == false) {
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,25 @@ public final class TranslogConfig {
private final Path translogPath;
private final ByteSizeValue bufferSize;
private final String nodeId;
private final boolean seedRemote;

/**
* Creates a new TranslogConfig instance
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
* @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
public TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
String nodeId,
boolean seedRemote
) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote);
}

TranslogConfig(
Expand All @@ -77,14 +86,16 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
String nodeId,
boolean seedRemote
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
this.seedRemote = seedRemote;
}

/**
Expand Down Expand Up @@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() {
public String getNodeId() {
return nodeId;
}

public boolean shouldSeedRemote() {
return seedRemote;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
translogPath,
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE,
""
"",
false
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
Expand Down
Loading
Loading