Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RFS to CDK #575

Merged
merged 17 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions RFS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,30 @@ RFS has support for packaging its java application as a Docker image by using th
```
Also built into this Docker/Gradle support is the ability to spin up a testing RFS environment using Docker compose. This compose file can be seen [here](./docker/docker-compose.yml) and includes the RFS container, a source cluster container, and a target cluster container.

This environment can be spun up with the Gradle command
This environment can be spun up with the Gradle command, and use the optional `-PshouldGenerateData` flag to preload a dataset from the `generateDatasetStage` in the multi-stage Docker [here](docker/TestSource_ES_7_10/Dockerfile). This stage will take a few minutes to run on its first attempt if it is generating data, as it will be making requests with OSB. This will be cached for future runs as long as the image remains the same.
```shell
./gradlew composeUp
./gradlew composeUp -PshouldGenerateData=true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate data isn't quite right since the user may never see generation happen (if they're using a cached layer).
From a docker image perspective, one has OSB datasets from an OSB run and the other doesn't. I would call this something like dataset=osb_4testWorkloads or something like that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have changed this to be similar to dataset=osb_4testWorkloads


```
And deleted with the Gradle command
```shell
./gradlew composeDown
```

After the Docker compose containers are created the elasticsearch/opensearch source and target clusters can be interacted with like normal. For RFS testing, a user should load templates/indices/documents into the source cluster and take a snapshot before kicking off RFS.
After the Docker compose containers are created the elasticsearch/opensearch source and target clusters can be interacted with like normal. For RFS testing, a user can also load custom templates/indices/documents into the source cluster before kicking off RFS.
```shell
# To check indices on the source cluster
curl https://localhost:19200/_cat/indices?v --insecure -u admin:admin
curl http://localhost:19200/_cat/indices?v

# To check indices on the target cluster
curl https://localhost:29200/_cat/indices?v --insecure -u admin:admin
curl http://localhost:29200/_cat/indices?v
```

Once the user is ready, they can kick off the RFS migration by running the RFS java command with the proper arguments on the RFS container like below:
To kick off RFS:
```shell
docker exec -it rfs-compose-reindex-from-snapshot-1 sh -c "/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --snapshot-name <snapshot-name> --snapshot-dir <snapshot-dir> --lucene-dir '/lucene' --target-host https://opensearchtarget:9200 --target-username admin --target-password admin --source-version es_7_10 --target-version os_2_11"
docker exec -it rfs-compose-reindex-from-snapshot-1 sh -c "/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --snapshot-name test-snapshot --snapshot-local-repo-dir /snapshots --min-replicas 0 --lucene-dir '/lucene' --source-host http://elasticsearchsource:9200 --target-host http://opensearchtarget:9200 --source-version es_7_10 --target-version os_2_11"
```


### Handling auth

RFS currently supports both basic auth (username/password) and no auth for both the source and target clusters. To use the no-auth approach, just neglect the username/password arguments.
Expand Down
49 changes: 43 additions & 6 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@ plugins {
id 'java'
id "com.avast.gradle.docker-compose" version "0.17.4"
id 'com.bmuschko.docker-remote-api'
id 'io.freefair.lombok' version '8.6'
}

import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import groovy.transform.Canonical

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

@Canonical
class DockerServiceProps {
String projectName = ""
String dockerImageName = ""
String inputDir = ""
Map<String, String> buildArgs = [:]
List<String> taskDependencies = []
}

repositories {
mavenCentral()
}
Expand Down Expand Up @@ -38,6 +49,10 @@ clean.doFirst {
delete project.file("./docker/build")
}

ext {
dataset = findProperty('shouldGenerateData') ?: 'false'
}

task demoPrintOutSnapshot (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.DemoPrintOutSnapshot'
Expand All @@ -56,6 +71,30 @@ task copyDockerRuntimeJars (type: Copy) {
include '*.jar'
}

DockerServiceProps[] dockerServices = [
new DockerServiceProps([projectName:"reindexFromSnapshot",
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"elasticsearchRFSSource",
dockerImageName:"elasticsearch_rfs_source",
inputDir:"./docker/TestSource_ES_7_10",
buildArgs:['SHOULD_GENERATE_DATA': "${project.ext.dataset}"]]),
] as DockerServiceProps[]


for (dockerService in dockerServices) {
task "buildDockerImage_${dockerService.projectName}" (type: DockerBuildImage) {
for (dep in dockerService.taskDependencies) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${version}")
images.add("migrations/${dockerService.dockerImageName}:latest")
}
}

// ./gradlew composeUp
// ./gradlew composeDown
dockerCompose {
Expand All @@ -64,12 +103,10 @@ dockerCompose {
}

// ./gradlew buildDockerImages
task buildDockerImages (type: DockerBuildImage) {
dependsOn copyDockerRuntimeJars
def dockerImageName = "reindex_from_snapshot"
inputDir = project.file("./docker")
images.add("migrations/${dockerImageName}:${version}")
images.add("migrations/${dockerImageName}:latest")
task buildDockerImages {
for (dockerService in dockerServices) {
dependsOn "buildDockerImage_${dockerService.projectName}"
}
}

tasks.getByName('composeUp')
Expand Down
36 changes: 30 additions & 6 deletions RFS/docker/TestSource_ES_7_10/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes count
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

RUN cd /etc/yum.repos.d/ && \
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* && \
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
RUN yum install -y gcc python3.9 python39-devel vim git less
RUN pip3 install opensearch-benchmark


FROM base AS generateDatasetStage

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to rotate the FROM base as generateDatasetStage upward. As it is, you'll install all of the yum packages for the final version, rather than ONLY propagate the ES data that you generated. As this is now, there's no reason to do a multistage build because you'll pretty much have the same amount of stuff in your final image - and there's going to be a lot that you don't need!

Copy link
Collaborator Author

@lewijacn lewijacn Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this was only a test ES source Docker image that we don't vend, its been helpful to have these packages in the final version for any adhoc testing when running RFS locally. As I'm thinking about this more it may make sense to have the migration console be a part of the docker compose and let it handle things like this in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want those things (gcc, python-dev, OSB) on the ES container? Why isn’t the migration console sufficient?

Oh - you don’t have a unified docker env. yet for that distribution, do you? Hmmm - I guess that makes more sense. You should put a comment in the file to explain that.

Maybe I’ll pick up the task to lift the dockerSolution out of the TrafficCapture directory tomorrow

ARG SHOULD_GENERATE_DATA=false
COPY generateDataset.sh /root
RUN chmod ug+x /root/generateDataset.sh

RUN /root/generateDataset.sh ${SHOULD_GENERATE_DATA} && \
cd /usr/share/elasticsearch/data && tar -cvzf esDataFiles.tgz nodes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm presuming that you aren't planning on setting up multiple parallel dataset generations (one for each workload, maybe things that aren't OSB, etc) and pull lots of different datasets potentially into the final container. If you wanted to support multiple datasets, I'd presume that you'd want separate images for each so that containers would only need to pay for what they were using.

Presuming that, it will be more efficient to drop the tar command and to just leave the directory intact. If you did think that you'd be generating a lot of images with different sample data. I think once you start looking to testing at scale, you'll want to pull snapshots into distributed clusters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the tarring step for now. Part of me would like to build more support here to make generating different datasets in parallel a bit easier, but I'm leaning more toward seeing the usefulness of this current piece and then reiterating if we find its something we'd really like. Testing multiple node clusters or even multiple RFS instances will look pretty different than what we have currently so I am also cautious that might shift direction.


FROM base

# Install the S3 Repo Plugin
RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3
Expand All @@ -8,13 +33,12 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2
unzip awscliv2.zip && \
./aws/install

RUN mkdir /snapshots && chown elasticsearch /snapshots

COPY --from=generateDatasetStage /usr/share/elasticsearch/data/esDataFiles.tgz /root/esDataFiles.tgz
# Install our custom entrypoint script
COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes coun
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/
RUN tar -xzf /root/esDataFiles.tgz -C /usr/share/elasticsearch/data

CMD /usr/share/elasticsearch/container-start.sh
27 changes: 27 additions & 0 deletions RFS/docker/TestSource_ES_7_10/generateDataset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

generate_data_requests() {
endpoint="http://localhost:9200"
# If auth or SSL is used, the correlating OSB options should be provided in this array
options=()
client_options=$(IFS=,; echo "${options[*]}")
set -o xtrace

echo "Running opensearch-benchmark workloads against ${endpoint}"
echo "Running opensearch-benchmark w/ 'geonames' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=geonames --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, but didn't find a way to JUST LOAD the data. The opensearch-benchmark runs load data and then they do tests on it. That latter part takes up most of the time & is work that we're not interested in for RFS. We might really, really want to load the data, do an RFS migration, then run the rest of the test so that we could test CDC on a historically migrated cluster.

It might be a good idea to open an issue or submit a PR with a new option for OSB.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably raise an issue as I don't have a good understanding of how intertwined these two things are from looking at the actual workloads: https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames

echo "Running opensearch-benchmark w/ 'http_logs' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=http_logs --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nested' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nested --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nyc_taxis' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nyc_taxis --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options
}

should_generate_data=$1

if [[ "$should_generate_data" == true ]]; then
/usr/local/bin/docker-entrypoint.sh eswrapper & echo $! > /tmp/esWrapperProcess.pid && sleep 10 && generate_data_requests
else
mkdir -p /usr/share/elasticsearch/data/nodes
fi
17 changes: 12 additions & 5 deletions RFS/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
version: '3.7'
services:

# TODO: Sync gradle for RFS and C&R
# Temporarily this requires the user to first build the elasticsearch image from the TrafficCapture gradle project
# directory using a command like ./gradlew buildDockerImages
elasticsearchsource:
image: 'migrations/elasticsearch_searchguard:latest'
image: 'migrations/elasticsearch_rfs_source:latest'
networks:
- migrations
environment:
- discovery.type=single-node
- path.repo=/snapshots
ports:
- '19200:9200'
volumes:
- snapshotStorage:/snapshots

# TODO: Add example command users can run to kickoff RFS after data is loaded on source
# Sample command to kick off RFS here: https://github.com/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker
reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
Expand All @@ -23,11 +23,14 @@ services:
condition: service_started
networks:
- migrations
volumes:
- snapshotStorage:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
environment:
- discovery.type=single-node
- plugins.security.disabled=true
networks:
- migrations
ports:
Expand All @@ -36,3 +39,7 @@ services:
networks:
migrations:
driver: bridge

volumes:
snapshotStorage:
driver: local
40 changes: 31 additions & 9 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ public static class Args {
@Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the source snapshot directory on local disk", required = false)
@Parameter(names = {"--snapshot-dir"}, description = "The absolute path to the existing source snapshot directory on local disk", required = false)
public String snapshotDirPath = null;

@Parameter(names = {"--snapshot-local-repo-dir"}, description = "The absolute path to take and store a new snapshot on source, this location should be accessible by the source and this app", required = false)
public String snapshotLocalRepoDirPath = null;

@Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = false)
public String s3LocalDirPath = null;

Expand Down Expand Up @@ -70,6 +73,10 @@ public static class Args {
@Parameter(names = {"--movement-type"}, description = "What you want to move - everything, metadata, or data. Default: 'everything'", required = false, converter = MovementType.ArgsConverter.class)
public MovementType movementType = MovementType.EVERYTHING;

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = "The minimum number of replicas configured for migrated indices on the target. This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements", required = true)
public int minNumberOfReplicas;

@Parameter(names = {"--template-whitelist"}, description = "List of template names to migrate. Note: For ES 6.8 this refers to legacy templates and for ES 7.10 this is index templates (e.g. 'posts_index_template1, posts_index_template2')", required = false)
public List<String> templateWhitelist;

Expand All @@ -93,6 +100,7 @@ public static void main(String[] args) throws InterruptedException {

String snapshotName = arguments.snapshotName;
Path snapshotDirPath = (arguments.snapshotDirPath != null) ? Paths.get(arguments.snapshotDirPath) : null;
Path snapshotLocalRepoDirPath = (arguments.snapshotLocalRepoDirPath != null) ? Paths.get(arguments.snapshotLocalRepoDirPath) : null;
Path s3LocalDirPath = (arguments.s3LocalDirPath != null) ? Paths.get(arguments.s3LocalDirPath) : null;
String s3RepoUri = arguments.s3RepoUri;
String s3Region = arguments.s3Region;
Expand All @@ -103,6 +111,7 @@ public static void main(String[] args) throws InterruptedException {
String targetHost = arguments.targetHost;
String targetUser = arguments.targetUser;
String targetPass = arguments.targetPass;
int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
ClusterVersion sourceVersion = arguments.sourceVersion;
ClusterVersion targetVersion = arguments.targetVersion;
List<String> templateWhitelist = arguments.templateWhitelist;
Expand All @@ -115,9 +124,6 @@ public static void main(String[] args) throws InterruptedException {
ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

// Should probably be passed in as an arguments
int awarenessAttributeDimensionality = 3; // https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/

// Sanity checks
if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) {
throw new IllegalArgumentException("Unsupported source version: " + sourceVersion);
Expand All @@ -133,25 +139,35 @@ public static void main(String[] args) throws InterruptedException {
* 2. A source host we'll take the snapshot from
* 3. An S3 URI of an existing snapshot in S3
*
* If you provide the source host, you still need to provide the S3 URI, etc to write the snapshot to.
* If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to.
*/
if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) {
throw new IllegalArgumentException("If you specify a local directory to take the snapshot from, you cannot specify a source host or S3 URI");
} else if (sourceHost != null && (s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) {
throw new IllegalArgumentException("If you specify a source host, you must also specify the S3 details as well");
} else if (sourceHost != null) {
if (s3RepoUri == null && s3Region == null && s3LocalDirPath == null && snapshotLocalRepoDirPath == null) {
throw new IllegalArgumentException(
"If you specify a source host, you must also specify the S3 details or the snapshotLocalRepoDirPath to write the snapshot to as well");
}
if ((s3RepoUri != null || s3Region != null || s3LocalDirPath != null) &&
(s3RepoUri == null || s3Region == null || s3LocalDirPath == null)) {
throw new IllegalArgumentException(
"You must specify all S3 details (repo URI, region, local directory path)");
}
}

SourceRepo repo;
if (snapshotDirPath != null) {
repo = new FilesystemRepo(snapshotDirPath);
} else if (s3RepoUri != null && s3Region != null && s3LocalDirPath != null) {
repo = new S3Repo(s3LocalDirPath, s3RepoUri, s3Region);
} else if (snapshotLocalRepoDirPath != null) {
repo = new FilesystemRepo(snapshotLocalRepoDirPath);
} else {
throw new IllegalArgumentException("Could not construct a source repo from the available, user-supplied arguments");
}

// Set the transformer
Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessAttributeDimensionality);
Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, awarenessDimensionality);

try {

Expand All @@ -161,7 +177,9 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
S3SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region);
SnapshotCreator snapshotCreator = repo instanceof S3Repo
? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString());
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
while (!snapshotCreator.isSnapshotFinished()) {
Expand Down Expand Up @@ -344,6 +362,10 @@ public static void main(String[] args) throws InterruptedException {
}

logger.info("Documents reindexed successfully");

logger.info("Refreshing newly added documents");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is at info because refresh could take a significant amount of time? (seems reasonable, just checking)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for larger datasets this can take a noticeable amount of time

DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

} catch (Exception e) {
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public static void reindex(String indexName, Document document, ConnectionDetail
RestClient client = new RestClient(targetConnection);
client.put(path, body, false);
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}
}
Loading
Loading