Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into MavenReleaseVersionUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreKurait authored Apr 16, 2024
2 parents 5cee8f1 + a86e989 commit d9b4b24
Show file tree
Hide file tree
Showing 70 changed files with 881 additions and 444 deletions.
16 changes: 8 additions & 8 deletions RFS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,30 @@ RFS has support for packaging its java application as a Docker image by using th
```
Also built into this Docker/Gradle support is the ability to spin up a testing RFS environment using Docker compose. This compose file can be seen [here](./docker/docker-compose.yml) and includes the RFS container, a source cluster container, and a target cluster container.

This environment can be spun up with the Gradle command
This environment can be spun up with the Gradle command, and use the optional `-Pdataset` flag to preload a dataset from the `generateDatasetStage` in the multi-stage Docker [here](docker/TestSource_ES_7_10/Dockerfile). This stage will take a few minutes to run on its first attempt if it is generating data, as it will be making requests with OSB. This will be cached for future runs.
```shell
./gradlew composeUp
./gradlew composeUp -Pdataset=default_osb_test_workloads

```
And deleted with the Gradle command
```shell
./gradlew composeDown
```

After the Docker compose containers are created the elasticsearch/opensearch source and target clusters can be interacted with like normal. For RFS testing, a user should load templates/indices/documents into the source cluster and take a snapshot before kicking off RFS.
After the Docker compose containers are created the elasticsearch/opensearch source and target clusters can be interacted with like normal. For RFS testing, a user can also load custom templates/indices/documents into the source cluster before kicking off RFS.
```shell
# To check indices on the source cluster
curl https://localhost:19200/_cat/indices?v --insecure -u admin:admin
curl http://localhost:19200/_cat/indices?v

# To check indices on the target cluster
curl https://localhost:29200/_cat/indices?v --insecure -u admin:admin
curl http://localhost:29200/_cat/indices?v
```

Once the user is ready, they can kick off the RFS migration by running the RFS java command with the proper arguments on the RFS container like below:
To kick off RFS:
```shell
docker exec -it rfs-compose-reindex-from-snapshot-1 sh -c "/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --snapshot-name <snapshot-name> --snapshot-dir <snapshot-dir> --lucene-dir '/lucene' --target-host https://opensearchtarget:9200 --target-username admin --target-password admin --source-version es_7_10 --target-version os_2_11"
docker exec -it rfs-compose-reindex-from-snapshot-1 sh -c "/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --snapshot-name test-snapshot --snapshot-local-repo-dir /snapshots --min-replicas 0 --lucene-dir '/lucene' --source-host http://elasticsearchsource:9200 --target-host http://opensearchtarget:9200 --source-version es_7_10 --target-version os_2_11"
```


### Handling auth

RFS currently supports both basic auth (username/password) and no auth for both the source and target clusters. To use the no-auth approach, just neglect the username/password arguments.
Expand Down
49 changes: 43 additions & 6 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@ plugins {
id 'java'
id "com.avast.gradle.docker-compose" version "0.17.4"
id 'com.bmuschko.docker-remote-api'
id 'io.freefair.lombok' version '8.6'
}

import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import groovy.transform.Canonical

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

@Canonical
class DockerServiceProps {
String projectName = ""
String dockerImageName = ""
String inputDir = ""
Map<String, String> buildArgs = [:]
List<String> taskDependencies = []
}

repositories {
mavenCentral()
}
Expand Down Expand Up @@ -38,6 +49,10 @@ clean.doFirst {
delete project.file("./docker/build")
}

ext {
dataset = findProperty('dataset') ?: 'skip_dataset'
}

task demoPrintOutSnapshot (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.DemoPrintOutSnapshot'
Expand All @@ -56,6 +71,30 @@ task copyDockerRuntimeJars (type: Copy) {
include '*.jar'
}

DockerServiceProps[] dockerServices = [
new DockerServiceProps([projectName:"reindexFromSnapshot",
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"elasticsearchRFSSource",
dockerImageName:"elasticsearch_rfs_source",
inputDir:"./docker/TestSource_ES_7_10",
buildArgs:['DATASET': "${project.ext.dataset}"]]),
] as DockerServiceProps[]


for (dockerService in dockerServices) {
task "buildDockerImage_${dockerService.projectName}" (type: DockerBuildImage) {
for (dep in dockerService.taskDependencies) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${version}")
images.add("migrations/${dockerService.dockerImageName}:latest")
}
}

// ./gradlew composeUp
// ./gradlew composeDown
dockerCompose {
Expand All @@ -64,12 +103,10 @@ dockerCompose {
}

// ./gradlew buildDockerImages
task buildDockerImages (type: DockerBuildImage) {
dependsOn copyDockerRuntimeJars
def dockerImageName = "reindex_from_snapshot"
inputDir = project.file("./docker")
images.add("migrations/${dockerImageName}:${version}")
images.add("migrations/${dockerImageName}:latest")
task buildDockerImages {
for (dockerService in dockerServices) {
dependsOn "buildDockerImage_${dockerService.projectName}"
}
}

tasks.getByName('composeUp')
Expand Down
40 changes: 33 additions & 7 deletions RFS/docker/TestSource_ES_7_10/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes count
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# TODO: These yum packages are currently included in the base image and propagated to the final image for adhoc testing
# with OSB, but should be placed in only the generateDatasetStage once we unify RFS and TC gradle setups and allow the Migration Console
# to have this responsibility. Task here: https://opensearch.atlassian.net/browse/MIGRATIONS-1628
RUN cd /etc/yum.repos.d/ && \
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* && \
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
RUN yum install -y gcc python3.9 python39-devel vim git less
RUN pip3 install opensearch-benchmark


FROM base AS generateDatasetStage

ARG DATASET="skip_dataset"
COPY generateDataset.sh /root
RUN chmod ug+x /root/generateDataset.sh

RUN /root/generateDataset.sh ${DATASET}


FROM base

# Install the S3 Repo Plugin
RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3
Expand All @@ -8,13 +35,12 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2
unzip awscliv2.zip && \
./aws/install

RUN mkdir /snapshots && chown elasticsearch /snapshots

COPY --from=generateDatasetStage /usr/share/elasticsearch/data/nodes /usr/share/elasticsearch/data/nodes
RUN chown -R elasticsearch /usr/share/elasticsearch/data

# Install our custom entrypoint script
COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes coun
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

CMD /usr/share/elasticsearch/container-start.sh
31 changes: 31 additions & 0 deletions RFS/docker/TestSource_ES_7_10/generateDataset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

generate_data_requests() {
endpoint="http://localhost:9200"
# If auth or SSL is used, the correlating OSB options should be provided in this array
options=()
client_options=$(IFS=,; echo "${options[*]}")
set -o xtrace

echo "Running opensearch-benchmark workloads against ${endpoint}"
echo "Running opensearch-benchmark w/ 'geonames' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=geonames --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'http_logs' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=http_logs --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nested' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nested --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nyc_taxis' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nyc_taxis --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options
}

dataset=$1

if [[ "$dataset" == "default_osb_test_workloads" ]]; then
/usr/local/bin/docker-entrypoint.sh eswrapper & echo $! > /tmp/esWrapperProcess.pid && sleep 10 && generate_data_requests
elif [[ "$dataset" == "skip_dataset" ]]; then
echo "Skipping data generation step"
mkdir -p /usr/share/elasticsearch/data/nodes
else
echo "Unknown dataset provided: ${dataset}"
exit 1;
fi
17 changes: 12 additions & 5 deletions RFS/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
version: '3.7'
services:

# TODO: Sync gradle for RFS and C&R
# Temporarily this requires the user to first build the elasticsearch image from the TrafficCapture gradle project
# directory using a command like ./gradlew buildDockerImages
elasticsearchsource:
image: 'migrations/elasticsearch_searchguard:latest'
image: 'migrations/elasticsearch_rfs_source:latest'
networks:
- migrations
environment:
- discovery.type=single-node
- path.repo=/snapshots
ports:
- '19200:9200'
volumes:
- snapshotStorage:/snapshots

# TODO: Add example command users can run to kickoff RFS after data is loaded on source
# Sample command to kick off RFS here: https://github.com/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker
reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
Expand All @@ -23,11 +23,14 @@ services:
condition: service_started
networks:
- migrations
volumes:
- snapshotStorage:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
environment:
- discovery.type=single-node
- plugins.security.disabled=true
networks:
- migrations
ports:
Expand All @@ -36,3 +39,7 @@ services:
networks:
migrations:
driver: bridge

volumes:
snapshotStorage:
driver: local
40 changes: 31 additions & 9 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ public static class Args {
@Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the source snapshot directory on local disk", required = false)
@Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the existing source snapshot directory on local disk", required = false)
public String snapshotDirPath = null;

@Parameter(names = {"--snapshot-local-repo-dir"}, description = "The absolute path to take and store a new snapshot on source, this location should be accessible by the source and this app", required = false)
public String snapshotLocalRepoDirPath = null;

@Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = false)
public String s3LocalDirPath = null;

Expand Down Expand Up @@ -70,6 +73,10 @@ public static class Args {
@Parameter(names = {"--movement-type"}, description = "What you want to move - everything, metadata, or data. Default: 'everything'", required = false, converter = MovementType.ArgsConverter.class)
public MovementType movementType = MovementType.EVERYTHING;

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = "The minimum number of replicas configured for migrated indices on the target. This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements", required = true)
public int minNumberOfReplicas;

@Parameter(names = {"--template-whitelist"}, description = "List of template names to migrate. Note: For ES 6.8 this refers to legacy templates and for ES 7.10 this is index templates (e.g. 'posts_index_template1, posts_index_template2')", required = false)
public List<String> templateWhitelist;

Expand All @@ -93,6 +100,7 @@ public static void main(String[] args) throws InterruptedException {

String snapshotName = arguments.snapshotName;
Path snapshotDirPath = (arguments.snapshotDirPath != null) ? Paths.get(arguments.snapshotDirPath) : null;
Path snapshotLocalRepoDirPath = (arguments.snapshotLocalRepoDirPath != null) ? Paths.get(arguments.snapshotLocalRepoDirPath) : null;
Path s3LocalDirPath = (arguments.s3LocalDirPath != null) ? Paths.get(arguments.s3LocalDirPath) : null;
String s3RepoUri = arguments.s3RepoUri;
String s3Region = arguments.s3Region;
Expand All @@ -103,6 +111,7 @@ public static void main(String[] args) throws InterruptedException {
String targetHost = arguments.targetHost;
String targetUser = arguments.targetUser;
String targetPass = arguments.targetPass;
int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
ClusterVersion sourceVersion = arguments.sourceVersion;
ClusterVersion targetVersion = arguments.targetVersion;
List<String> templateWhitelist = arguments.templateWhitelist;
Expand All @@ -115,9 +124,6 @@ public static void main(String[] args) throws InterruptedException {
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Should probably be passed in as an arguments
int awarenessAttributeDimensionality = 3; // https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/

// Sanity checks
if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) {
throw new IllegalArgumentException("Unsupported source version: " + sourceVersion);
Expand All @@ -133,25 +139,35 @@ public static void main(String[] args) throws InterruptedException {
* 2. A source host we'll take the snapshot from
* 3. An S3 URI of an existing snapshot in S3
*
* If you provide the source host, you still need to provide the S3 URI, etc to write the snapshot to.
* If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to.
*/
if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) {
throw new IllegalArgumentException("If you specify a local directory to take the snapshot from, you cannot specify a source host or S3 URI");
} else if (sourceHost != null && (s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) {
throw new IllegalArgumentException("If you specify a source host, you must also specify the S3 details as well");
} else if (sourceHost != null) {
if (s3RepoUri == null && s3Region == null && s3LocalDirPath == null && snapshotLocalRepoDirPath == null) {
throw new IllegalArgumentException(
"If you specify a source host, you must also specify the S3 details or the snapshotLocalRepoDirPath to write the snapshot to as well");
}
if ((s3RepoUri != null || s3Region != null || s3LocalDirPath != null) &&
(s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) {
throw new IllegalArgumentException(
"You must specify all S3 details (repo URI, region, local directory path)");
}
}

SourceRepo repo;
if (snapshotDirPath != null) {
repo = new FilesystemRepo(snapshotDirPath);
} else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) {
repo = new S3Repo(s3LocalDirPath, s3RepoUri, s3Region);
} else if (snapshotLocalRepoDirPath != null) {
repo = new FilesystemRepo(snapshotLocalRepoDirPath);
} else {
throw new IllegalArgumentException("Could not construct a source repo from the available, user-supplied arguments");
}

// Set the transformer
Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessAttributeDimensionality);
Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessDimensionality);

try {

Expand All @@ -161,7 +177,9 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
S3SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region);
SnapshotCreator snapshotCreator = repo instanceof S3Repo
? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString());
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
while (!snapshotCreator.isSnapshotFinished()) {
Expand Down Expand Up @@ -344,6 +362,10 @@ public static void main(String[] args) throws InterruptedException {
}

logger.info("Documents reindexed successfully");

logger.info("Refreshing newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

} catch (Exception e) {
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public static void reindex(String indexName, Document document, ConnectionDetail
RestClient client = new RestClient(targetConnection);
client.put(path, body, false);
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}
}
Loading

0 comments on commit d9b4b24

Please sign in to comment.