Skip to content

Commit

Permalink
Integrate local recovery with remote store seeding during migration (#…
Browse files Browse the repository at this point in the history
…12922) (#13428)

(cherry picked from commit a8008e2)

Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 56fc830 commit 22a53b8
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
String indexName = "idx1";
int numOfNodes = randomIntBetween(6, 9);

/**
* Tests local recovery sanity in the happy path flow
*/
public void testLocalRecoveryRollingRestart() throws Exception {
triggerRollingRestartForRemoteMigration(0);
internalCluster().stopAllNodes();
}

/**
* Tests local recovery sanity during remote migration with a node restart in between
*/
public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
triggerRollingRestartForRemoteMigration(0);

DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
DiscoveryNode nodeToRestart = (DiscoveryNode) discoveryNodes.getDataNodes().values().toArray()[randomIntBetween(0, numOfNodes - 4)];
internalCluster().restartNode(nodeToRestart.getName());

Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
for (Map.Entry<ShardRouting, ShardStats> entry : shardStatsMap.entrySet()) {
ShardRouting shardRouting = entry.getKey();
ShardStats shardStats = entry.getValue();
if (nodeToRestart.equals(shardRouting.currentNodeId())) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
.filter(path -> path.getFileName().toString().contains("segments_"))
.map(path -> path.getFileName().toString())
.collect(Collectors.toList());
Set<String> expectedUniqueSegmentsNFiles = segmentsNFilesInRepo.stream()
.map(fileName -> fileName.split(SEGMENT_NAME_UUID_SEPARATOR)[0])
.collect(Collectors.toSet());
assertEquals(
"Expected no duplicate segments_N files in remote but duplicates were found " + segmentsNFilesInRepo,
expectedUniqueSegmentsNFiles.size(),
segmentsNFilesInRepo.size()
);
}, 90, TimeUnit.SECONDS);
}

internalCluster().stopAllNodes();
}

/**
* Tests local recovery flow sanity in the happy path flow with replicas in place
*/
public void testLocalRecoveryFlowWithReplicas() throws Exception {
triggerRollingRestartForRemoteMigration(randomIntBetween(1, 2));
internalCluster().stopAllNodes();
}

/**
* Helper method to run a rolling restart for migration to remote backed cluster
*/
private void triggerRollingRestartForRemoteMigration(int replicaCount) throws Exception {
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startNodes(numOfNodes - 3);

// create index
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
.build();
createIndex(indexName, indexSettings);
ensureGreen(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));

initDocRepToRemoteMigration();

// rolling restart
final Settings remoteNodeAttributes = remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath
);
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
// Update remote attributes
@Override
public Settings onNodeStopped(String nodeName) {
return remoteNodeAttributes;
}
});
ensureStableCluster(numOfNodes);
ensureGreen(TimeValue.timeValueSeconds(90), indexName);
assertEquals(internalCluster().size(), numOfNodes);

// Assert on remote uploads
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
shardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}
});

// Assert on new remote uploads after seeding
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
Map<ShardRouting, ShardStats> newShardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
newShardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats prevRemoteSegmentStats = shardStatsMap.get(shardRouting)
.getStats()
.getSegments()
.getRemoteSegmentStats();
RemoteSegmentStats newRemoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(newRemoteSegmentStats.getTotalUploadTime() > prevRemoteSegmentStats.getTotalUploadTime());
assertTrue(newRemoteSegmentStats.getUploadBytesSucceeded() > prevRemoteSegmentStats.getUploadBytesSucceeded());

RemoteTranslogStats prevRemoteTranslogStats = shardStatsMap.get(shardRouting)
.getStats()
.getTranslog()
.getRemoteTranslogStats();
RemoteTranslogStats newRemoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
assertTrue(newRemoteTranslogStats.getUploadBytesSucceeded() > prevRemoteTranslogStats.getUploadBytesSucceeded());
}
});
}
}
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,14 @@ public synchronized IndexShard createShard(
if (this.indexSettings.isRemoteStoreEnabled()) {
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
} else {
if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
if (sourceNode == null || sourceNode.isRemoteStoreNode() == false) {
if (routing.primary() == false) {
throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
}
logger.info("DocRep shard {} is migrating to remote", shardId);
seedRemote = true;
}

remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
this.indexSettings.getUUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,12 +1108,11 @@ private ReplicationGroup calculateReplicationGroup() {
} else {
newVersion = replicationGroup.getVersion() + 1;
}
assert indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| (replicationGroup != null
&& replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())))
assert newVersion == 0 || indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
: "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down Expand Up @@ -5012,7 +5012,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger
logger,
shouldSeedRemoteStore()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,14 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
if (indexShard.shouldSeedRemoteStore()) {
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId());
indexShard.refresh("remote store migration");
});
indexShard.waitForRemoteStoreSync();
logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId());
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public RemoteFsTranslog(
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger);
download(translogTransferManager, location, logger, config.shouldSeedRemote());
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down Expand Up @@ -168,7 +168,8 @@ public static void download(
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger
Logger logger,
boolean seedRemote
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
Expand All @@ -189,11 +190,12 @@ public static void download(
pathStrategy,
remoteStoreSettings
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Expand All @@ -206,7 +208,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
boolean success = false;
long startTimeMs = System.currentTimeMillis();
try {
downloadOnce(translogTransferManager, location, logger);
downloadOnce(translogTransferManager, location, logger, seedRemote);
success = true;
return;
} catch (FileNotFoundException | NoSuchFileException e) {
Expand All @@ -220,7 +222,8 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
throw ex;
}

private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down Expand Up @@ -262,7 +265,9 @@ private static void downloadOnce(TranslogTransferManager translogTransferManager
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
final Checkpoint checkpoint = readCheckpoint(location);
if (isEmptyTranslog(checkpoint) == false) {
if (seedRemote) {
logger.debug("Remote migration ongoing. Retaining the translog on local, skipping clean-up");
} else if (isEmptyTranslog(checkpoint) == false) {
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,25 @@ public final class TranslogConfig {
private final Path translogPath;
private final ByteSizeValue bufferSize;
private final String nodeId;
private final boolean seedRemote;

/**
* Creates a new TranslogConfig instance
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
* @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
public TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
String nodeId,
boolean seedRemote
) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote);
}

TranslogConfig(
Expand All @@ -77,14 +86,16 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
String nodeId,
boolean seedRemote
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
this.seedRemote = seedRemote;
}

/**
Expand Down Expand Up @@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() {
public String getNodeId() {
return nodeId;
}

public boolean shouldSeedRemote() {
return seedRemote;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
translogPath,
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE,
""
"",
false
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
Expand Down
Loading

0 comments on commit 22a53b8

Please sign in to comment.