Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate local recovery with remote store seeding during migration #13376

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
String indexName = "idx1";
int numOfNodes = randomIntBetween(6, 9);

/**
* Tests local recovery sanity in the happy path flow
*/
public void testLocalRecoveryRollingRestart() throws Exception {
triggerRollingRestartForRemoteMigration(0);
internalCluster().stopAllNodes();
}

/**
* Tests local recovery sanity during remote migration with a node restart in between
*/
public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
triggerRollingRestartForRemoteMigration(0);

DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
DiscoveryNode nodeToRestart = (DiscoveryNode) discoveryNodes.getDataNodes().values().toArray()[randomIntBetween(0, numOfNodes - 4)];
internalCluster().restartNode(nodeToRestart.getName());

Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
for (Map.Entry<ShardRouting, ShardStats> entry : shardStatsMap.entrySet()) {
ShardRouting shardRouting = entry.getKey();
ShardStats shardStats = entry.getValue();
if (nodeToRestart.equals(shardRouting.currentNodeId())) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
.filter(path -> path.getFileName().toString().contains("segments_"))
.map(path -> path.getFileName().toString())
.collect(Collectors.toList());
Set<String> expectedUniqueSegmentsNFiles = segmentsNFilesInRepo.stream()
.map(fileName -> fileName.split(SEGMENT_NAME_UUID_SEPARATOR)[0])
.collect(Collectors.toSet());
assertEquals(
"Expected no duplicate segments_N files in remote but duplicates were found " + segmentsNFilesInRepo,
expectedUniqueSegmentsNFiles.size(),
segmentsNFilesInRepo.size()
);
}, 90, TimeUnit.SECONDS);
}

internalCluster().stopAllNodes();
}

/**
* Tests local recovery flow sanity in the happy path flow with replicas in place
*/
public void testLocalRecoveryFlowWithReplicas() throws Exception {
triggerRollingRestartForRemoteMigration(randomIntBetween(1, 2));
internalCluster().stopAllNodes();
}

/**
* Helper method to run a rolling restart for migration to remote backed cluster
*/
private void triggerRollingRestartForRemoteMigration(int replicaCount) throws Exception {
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startNodes(numOfNodes - 3);

// create index
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
.build();
createIndex(indexName, indexSettings);
ensureGreen(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));

initDocRepToRemoteMigration();

// rolling restart
final Settings remoteNodeAttributes = remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath
);
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
// Update remote attributes
@Override
public Settings onNodeStopped(String nodeName) {
return remoteNodeAttributes;
}
});
ensureStableCluster(numOfNodes);
ensureGreen(TimeValue.timeValueSeconds(90), indexName);
assertEquals(internalCluster().size(), numOfNodes);

// Assert on remote uploads
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
shardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}
});

// Assert on new remote uploads after seeding
indexBulk(indexName, randomIntBetween(100, 10000));
refresh(indexName);
indexBulk(indexName, randomIntBetween(100, 10000));
Map<ShardRouting, ShardStats> newShardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
newShardStatsMap.forEach((shardRouting, shardStats) -> {
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
RemoteSegmentStats prevRemoteSegmentStats = shardStatsMap.get(shardRouting)
.getStats()
.getSegments()
.getRemoteSegmentStats();
RemoteSegmentStats newRemoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertTrue(newRemoteSegmentStats.getTotalUploadTime() > prevRemoteSegmentStats.getTotalUploadTime());
assertTrue(newRemoteSegmentStats.getUploadBytesSucceeded() > prevRemoteSegmentStats.getUploadBytesSucceeded());

RemoteTranslogStats prevRemoteTranslogStats = shardStatsMap.get(shardRouting)
.getStats()
.getTranslog()
.getRemoteTranslogStats();
RemoteTranslogStats newRemoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
assertTrue(newRemoteTranslogStats.getUploadBytesSucceeded() > prevRemoteTranslogStats.getUploadBytesSucceeded());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,14 @@ public synchronized IndexShard createShard(
if (this.indexSettings.isRemoteStoreEnabled()) {
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
} else {
if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
if (sourceNode == null || sourceNode.isRemoteStoreNode() == false) {
if (routing.primary() == false) {
throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
}
logger.info("DocRep shard {} is migrating to remote", shardId);
seedRemote = true;
}

remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
this.indexSettings.getUUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,12 +1108,11 @@ private ReplicationGroup calculateReplicationGroup() {
} else {
newVersion = replicationGroup.getVersion() + 1;
}
assert indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| (replicationGroup != null
&& replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())))
assert newVersion == 0 || indexSettings.isRemoteTranslogStoreEnabled()
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
|| replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
: "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";

Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down Expand Up @@ -5005,7 +5005,16 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog(), logger);
RemoteFsTranslog.download(
repository,
shardId,
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger,
shouldSeedRemoteStore()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,14 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
if (indexShard.shouldSeedRemoteStore()) {
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId());
indexShard.refresh("remote store migration");
});
indexShard.waitForRemoteStoreSync();
logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId());
}
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public RemoteFsTranslog(
remoteTranslogTransferTracker
);
try {
download(translogTransferManager, location, logger);
download(translogTransferManager, location, logger, config.shouldSeedRemote());
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down Expand Up @@ -151,8 +151,16 @@ RemoteTranslogTransferTracker getRemoteTranslogTracker() {
return remoteTranslogTransferTracker;
}

public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location, Logger logger)
throws IOException {
public static void download(
Repository repository,
ShardId shardId,
ThreadPool threadPool,
Path location,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger,
boolean seedRemote
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
"%s repository should be instance of BlobStoreRepository",
Expand All @@ -170,11 +178,12 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
fileTransferTracker,
remoteTranslogTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location, logger);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Expand All @@ -187,7 +196,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
boolean success = false;
long startTimeMs = System.currentTimeMillis();
try {
downloadOnce(translogTransferManager, location, logger);
downloadOnce(translogTransferManager, location, logger, seedRemote);
success = true;
return;
} catch (FileNotFoundException | NoSuchFileException e) {
Expand All @@ -201,7 +210,8 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
throw ex;
}

private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down Expand Up @@ -243,7 +253,9 @@ private static void downloadOnce(TranslogTransferManager translogTransferManager
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
final Checkpoint checkpoint = readCheckpoint(location);
if (isEmptyTranslog(checkpoint) == false) {
if (seedRemote) {
logger.debug("Remote migration ongoing. Retaining the translog on local, skipping clean-up");
} else if (isEmptyTranslog(checkpoint) == false) {
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,25 @@ public final class TranslogConfig {
private final Path translogPath;
private final ByteSizeValue bufferSize;
private final String nodeId;
private final boolean seedRemote;

/**
* Creates a new TranslogConfig instance
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
* @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
public TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
String nodeId,
boolean seedRemote
) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote);
}

TranslogConfig(
Expand All @@ -77,14 +86,16 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
String nodeId,
boolean seedRemote
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
this.seedRemote = seedRemote;
}

/**
Expand Down Expand Up @@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() {
public String getNodeId() {
return nodeId;
}

public boolean shouldSeedRemote() {
return seedRemote;
}
}
Loading
Loading