Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote publication integ test #16384

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.util.stream.Stream;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -68,35 +66,40 @@ public void setUp() throws Exception {
}

public Settings.Builder remotePublishConfiguredNodeSetting() {
String remoteStoreNodeAttributePrefix = "remote_publication";
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
remoteStoreNodeAttributePrefix,
REPOSITORY_NAME
);
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
remoteStoreNodeAttributePrefix,
REPOSITORY_NAME
);
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"node.attr." + REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
remoteStoreNodeAttributePrefix,
ROUTING_TABLE_REPO_NAME
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"node.attr." + REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
remoteStoreNodeAttributePrefix,
ROUTING_TABLE_REPO_NAME
);

Settings.Builder builder = Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, REPOSITORY_NAME)
.put("node.attr." + "remote_publication.state.repository", REPOSITORY_NAME)
.put(stateRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put("node.attr." + "remote_publication.routing_table.repository", ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
Expand All @@ -111,7 +114,7 @@ public Settings.Builder remoteWithRoutingTableNodeSetting() {
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath,
REPOSITORY_NAME,
ROUTING_TABLE_REPO_NAME,
segmentRepoPath,
false
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -61,8 +60,6 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {

protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;
boolean addRemote = false;
Settings extraSettings = Settings.EMPTY;

Expand Down Expand Up @@ -94,7 +91,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(extraSettings)
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, super.segmentRepoPath, REPOSITORY_2_NAME, super.translogRepoPath))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ private Path registerCustomRepository() {

private void verifyRestoredRepositories(Path repoPath) {
RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(3, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertEquals(3, repositoriesMetadata.repositories().size());
// routing repo added
assertEquals(4, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings()));
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings()));
assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.persistent.PersistentTasksCustomMetadata;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -67,7 +68,8 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getClusterStateRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.getRoutingTableRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
Expand Down Expand Up @@ -517,7 +519,11 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi
.findFirst();

if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
List<String> repos = Arrays.asList(
getClusterStateRepoName(remotePublicationNode.get().getAttributes()),
getRoutingTableRepoName(remotePublicationNode.get().getAttributes())
);
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), repos);
}
}

Expand Down Expand Up @@ -548,14 +554,14 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
Optional<DiscoveryNode> remoteRoutingTableNode = existingNodes.stream()
.filter(
node -> node.isRemoteStoreNode()
&& node.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
&& RemoteStoreNodeAttribute.getRoutingTableRepoName(node.getAttributes()) != null
&& RemoteStoreNodeAttribute.getSegmentRepoName(node.getAttributes()) != null
)
.findFirst();
// If none of the existing nodes have routing table repo, then we skip this repo check if present in joining node.
// This ensures a new node with remote routing table repo is able to join the cluster.
if (remoteRoutingTableNode.isEmpty()) {
String joiningNodeRepoName = joiningNode.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
String joiningNodeRepoName = getRoutingTableRepoName(joiningNode.getAttributes());
if (joiningNodeRepoName != null) {
reposToSkip.add(joiningNodeRepoName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,12 +1166,8 @@ public static void updateRemoteStoreSettings(
.findFirst();

if (remoteNode.isPresent()) {
translogRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
segmentRepo = remoteNode.get()
.getAttributes()
.get(RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
translogRepo = RemoteStoreNodeAttribute.getTranslogRepoName(remoteNode.get().getAttributes());
segmentRepo = RemoteStoreNodeAttribute.getSegmentRepoName(remoteNode.get().getAttributes());
if (segmentRepo != null && translogRepo != null) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -62,10 +63,9 @@
import java.util.stream.Stream;

import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isClusterStateRepoConfigured;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRoutingTableRepoConfigured;

/**
* A discovery node represents a node that is part of the cluster.
Expand Down Expand Up @@ -510,20 +510,15 @@ public boolean isSearchNode() {
* @return true if the node contains remote store node attributes, false otherwise
*/
public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
return isClusterStateRepoConfigured(this.getAttributes()) && RemoteStoreNodeAttribute.isSegmentRepoConfigured(this.getAttributes());
}

/**
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
*/
public boolean isRemoteStatePublicationEnabled() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)))
&& this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
return isClusterStateRepoConfigured(this.getAttributes()) && isRoutingTableRepoConfigured(this.getAttributes());
}

/**
Expand Down Expand Up @@ -587,13 +582,16 @@ public String toString() {
sb.append('}');
}
if (!attributes.isEmpty()) {
sb.append(
attributes.entrySet()
.stream()
.filter(entry -> !entry.getKey().startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)) // filter remote_store attributes
// from logging to reduce noise.
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
sb.append(attributes.entrySet().stream().filter(entry -> {
for (String prefix : REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX) {
if (entry.getKey().startsWith(prefix)) {
return false;
}
}
return true;
}) // filter remote_store attributes
// from logging to reduce noise.
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -235,9 +234,7 @@ protected void doClose() throws IOException {
@Override
protected void doStart() {
assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
final String remoteStoreRepo = RemoteStoreNodeAttribute.getRoutingTableRepoName(settings);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata;
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -1069,9 +1068,8 @@ public void close() throws IOException {

public void start() {
assert isRemoteClusterStateConfigured(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
final String remoteStoreRepo = RemoteStoreNodeAttribute.getClusterStateRepoName(settings);

assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -70,11 +69,6 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s";
static final String TRANSLOG_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey()
+ RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
static final String SEGMENT_REPO_NAME_KEY = Node.NODE_ATTRIBUTES.getKey()
+ RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;

private static final Logger logger = LogManager.getLogger(RemoteIndexPathUploader.class);

private final Settings settings;
Expand Down Expand Up @@ -226,9 +220,8 @@ private void writePathToRemoteStore(
}
}

private Repository validateAndGetRepository(String repoSetting) {
final String repo = settings.get(repoSetting);
assert repo != null : "Remote " + repoSetting + " repository is not configured";
private Repository validateAndGetRepository(String repo) {
assert repo != null : "Remote repository is not configured";
final Repository repository = repositoriesService.get().repository(repo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
return repository;
Expand All @@ -240,15 +233,16 @@ public void start() {
// If remote store data attributes are not present than we skip this.
return;
}
translogRepository = (BlobStoreRepository) validateAndGetRepository(TRANSLOG_REPO_NAME_KEY);
segmentRepository = (BlobStoreRepository) validateAndGetRepository(SEGMENT_REPO_NAME_KEY);

translogRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings));
segmentRepository = (BlobStoreRepository) validateAndGetRepository(RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings));
}

private boolean isTranslogSegmentRepoSame() {
// TODO - The current comparison checks the repository name. But it is also possible that the repository are same
// by attributes, but different by name. We need to handle this.
String translogRepoName = settings.get(TRANSLOG_REPO_NAME_KEY);
String segmentRepoName = settings.get(SEGMENT_REPO_NAME_KEY);
String translogRepoName = RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(settings);
String segmentRepoName = RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(settings);
return Objects.equals(translogRepoName, segmentRepoName);
}

Expand Down
Loading
Loading