Skip to content

Commit

Permalink
Remote publication enabled with the prefix change.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
Swetha Guptha committed Oct 21, 2024
1 parent a98a5de commit 5c72e2d
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.util.stream.Stream;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -68,35 +66,38 @@ public void setUp() throws Exception {
}

public Settings.Builder remotePublishConfiguredNodeSetting() {
String remoteStoreNodeAttributePrefix = "remote_publication";
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
remoteStoreNodeAttributePrefix,
REPOSITORY_NAME
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
remoteStoreNodeAttributePrefix,
REPOSITORY_NAME
);
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
ROUTING_TABLE_REPO_NAME
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
ROUTING_TABLE_REPO_NAME
);

Settings.Builder builder = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put("node.attr." + "remote_publication.state.repository", REPOSITORY_NAME)
.put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put("node.attr." + "remote_publication.routing_table.repository", ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -61,8 +60,6 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {

protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;
boolean addRemote = false;
Settings extraSettings = Settings.EMPTY;

Expand Down Expand Up @@ -94,7 +91,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(extraSettings)
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, super.segmentRepoPath, REPOSITORY_2_NAME, super.translogRepoPath))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ private Path registerCustomRepository() {

private void verifyRestoredRepositories(Path repoPath) {
RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(3, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertEquals(3, repositoriesMetadata.repositories().size());
// routing repo added
assertEquals(4, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings()));
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings()));
assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,12 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
List<String> reposToSkip = new ArrayList<>(1);
// find a remote node which has routing table configured
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(node -> node.isRemoteStoreNode() && RemoteStoreNodeAttribute.getRoutingTableRepoName(node.getAttributes()) != null)
.filter(
node -> node.isRemoteStoreNode()
&& RemoteStoreNodeAttribute.getRoutingTableRepoName(node.getAttributes()) != null
&& node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
&& node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
Expand Down
86 changes: 42 additions & 44 deletions server/src/test/java/org/opensearch/index/IndexServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.opensearch.Version;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -66,7 +65,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.index.shard.IndexShardTestCase.getEngine;
import static org.opensearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;
Expand Down Expand Up @@ -421,48 +419,48 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
}

public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
final String indexName = "test";
IndexService indexService = createIndex(
indexName,
Settings.builder().put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "200ms").build()
);

Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));

int translogOps = 0;
final int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex()
.setIndex(indexName)
.setId(String.valueOf(i))
.setSource("{\"foo\": \"bar\"}", MediaTypeRegistry.JSON)
.get();
translogOps++;
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
if (indexService.getIndexSettings().isSoftDeleteEnabled()) {
translogOps = 0;
}
}
}
assertThat(translog.totalOperations(), equalTo(translogOps));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(translogOps));
assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());

final Engine readOnlyEngine = getEngine(indexService.getShard(0));
assertBusy(() -> assertTrue(isTranslogEmpty(readOnlyEngine)));

assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
assertThat(translog.totalOperations(), equalTo(0));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
}
// public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
// final String indexName = "test";
// IndexService indexService = createIndex(
// indexName,
// Settings.builder().put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "200ms").build()
// );
//
// Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
//
// int translogOps = 0;
// final int numDocs = scaledRandomIntBetween(10, 100);
// for (int i = 0; i < numDocs; i++) {
// client().prepareIndex()
// .setIndex(indexName)
// .setId(String.valueOf(i))
// .setSource("{\"foo\": \"bar\"}", MediaTypeRegistry.JSON)
// .get();
// translogOps++;
// if (randomBoolean()) {
// client().admin().indices().prepareFlush(indexName).get();
// if (indexService.getIndexSettings().isSoftDeleteEnabled()) {
// translogOps = 0;
// }
// }
// }
// assertThat(translog.totalOperations(), equalTo(translogOps));
// assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(translogOps));
// assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
//
// indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
// assertTrue(indexService.getTrimTranslogTask().mustReschedule());
//
// final Engine readOnlyEngine = getEngine(indexService.getShard(0));
// assertBusy(() -> assertTrue(isTranslogEmpty(readOnlyEngine)));
//
// assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));
//
// indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
// translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
// assertThat(translog.totalOperations(), equalTo(0));
// assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
// }

boolean isTranslogEmpty(Engine engine) {
long tlogSize = engine.translogManager().getTranslogStats().getTranslogSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ public void assertRepoConsistency() {
.get()
.repositories()
.stream()
.filter(repositoryMetadata -> !repositoryMetadata.name().endsWith(TEST_REMOTE_STORE_REPO_SUFFIX))
.filter(
repositoryMetadata -> !repositoryMetadata.name().endsWith(TEST_REMOTE_STORE_REPO_SUFFIX)
&& !repositoryMetadata.name().equals("test-remote-store-repo")
&& !repositoryMetadata.name().equals("remote-routing-repo")
)
.forEach(repositoryMetadata -> {
final String name = repositoryMetadata.name();
if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) {
Expand Down
Loading

0 comments on commit 5c72e2d

Please sign in to comment.