Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RFS to CDK #575

Merged
merged 17 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RFS/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ RFS has support for packaging its java application as a Docker image by using th
```
Also built into this Docker/Gradle support is the ability to spin up a testing RFS environment using Docker compose. This compose file can be seen [here](./docker/docker-compose.yml) and includes the RFS container, a source cluster container, and a target cluster container.

This environment can be spun up with the Gradle command, and use the optional `-Pdataset` flag to preload a defined dataset from `docker/TestSource_ES_7_10/test-resources`
This environment can be spun up with the Gradle command, and use the optional `-Pdataset` flag to preload a dataset from the `generateDatasetStage` in the multi-stage Docker [here](docker/TestSource_ES_7_10/Dockerfile). This stage will take a few minutes to run on its first attempt if it is generating data, as it will be making requests with OSB. This will be cached for future runs.
```shell
./gradlew composeUp -Pdataset='small-benchmark-single-node.tar.gz'
./gradlew composeUp -Pdataset=default_osb_test_workloads

```
And deleted with the Gradle command
Expand Down
12 changes: 7 additions & 5 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class DockerServiceProps {
String dockerImageName = ""
String inputDir = ""
Map<String, String> buildArgs = [:]
List<String> taskDependencies = []
}

repositories {
Expand Down Expand Up @@ -49,7 +50,7 @@ clean.doFirst {
}

ext {
dataset = findProperty('dataset') ?: 'no-data.tar.gz'
dataset = findProperty('dataset') ?: 'skip_dataset'
}

task demoPrintOutSnapshot (type: JavaExec) {
Expand All @@ -73,18 +74,19 @@ task copyDockerRuntimeJars (type: Copy) {
DockerServiceProps[] dockerServices = [
new DockerServiceProps([projectName:"reindexFromSnapshot",
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker"]),
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"elasticsearchRFSSource",
dockerImageName:"elasticsearch_rfs_source",
inputDir:"./docker/TestSource_ES_7_10",
buildArgs:['EXISTING_DATA': "${project.ext.dataset}"]]),
buildArgs:['DATASET': "${project.ext.dataset}"]]),
] as DockerServiceProps[]


for (dockerService in dockerServices) {
task "buildDockerImage_${dockerService.projectName}" (type: DockerBuildImage) {
if (dockerService.projectName == "reindexFromSnapshot") {
dependsOn "copyDockerRuntimeJars"
for (dep in dockerService.taskDependencies) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
buildArgs = dockerService.buildArgs
Expand Down
44 changes: 28 additions & 16 deletions RFS/docker/TestSource_ES_7_10/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes count
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# TODO: These yum packages are currently included in the base image and propagated to the final image for adhoc testing
# with OSB, but should be placed in only the generateDatasetStage once we unify RFS and TC gradle setups and allow the Migration Console
# to have this responsibility. Task here: https://opensearch.atlassian.net/browse/MIGRATIONS-1628
RUN cd /etc/yum.repos.d/ && \
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* && \
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* && \
yum install -y python3.9 vim git && \
pip3 install opensearch-benchmark
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
RUN yum install -y gcc python3.9 python39-devel vim git less
RUN pip3 install opensearch-benchmark


FROM base AS generateDatasetStage

ARG DATASET="skip_dataset"
COPY generateDataset.sh /root
RUN chmod ug+x /root/generateDataset.sh

RUN /root/generateDataset.sh ${DATASET}


FROM base

# Install the S3 Repo Plugin
RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3
Expand All @@ -14,21 +35,12 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2
unzip awscliv2.zip && \
./aws/install

ARG EXISTING_DATA="no-data.tar.gz"

RUN mkdir /snapshots && chown elasticsearch /snapshots
COPY ./test-resources/${EXISTING_DATA} /usr/share/elasticsearch
RUN tar -xzf /usr/share/elasticsearch/${EXISTING_DATA} -C /usr/share/elasticsearch/data && \
chown -R elasticsearch /usr/share/elasticsearch/data && \
rm /usr/share/elasticsearch/${EXISTING_DATA}

COPY --from=generateDatasetStage /usr/share/elasticsearch/data/nodes /usr/share/elasticsearch/data/nodes
RUN chown -R elasticsearch /usr/share/elasticsearch/data

# Install our custom entrypoint script
COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes coun
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

CMD /usr/share/elasticsearch/container-start.sh
31 changes: 31 additions & 0 deletions RFS/docker/TestSource_ES_7_10/generateDataset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

generate_data_requests() {
endpoint="http://localhost:9200"
# If auth or SSL is used, the correlating OSB options should be provided in this array
options=()
client_options=$(IFS=,; echo "${options[*]}")
set -o xtrace

echo "Running opensearch-benchmark workloads against ${endpoint}"
echo "Running opensearch-benchmark w/ 'geonames' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=geonames --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, but didn't find a way to JUST LOAD the data. The opensearch-benchmark runs load data and then they do tests on it. That latter part takes up most of the time & is work that we're not interested in for RFS. We might really, really want to load the data, do an RFS migration, then run the rest of the test so that we could test CDC on a historically migrated cluster.

It might be a good idea to open an issue or submit a PR with a new option for OSB.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably raise an issue as I don't have a good understanding of how intertwined these two things are from looking at the actual workloads: https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames

echo "Running opensearch-benchmark w/ 'http_logs' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=http_logs --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nested' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nested --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options &&
echo "Running opensearch-benchmark w/ 'nyc_taxis' workload..." &&
opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host=$endpoint --workload=nyc_taxis --pipeline=benchmark-only --test-mode --kill-running-processes --workload-params "target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,search_clients:1" --client-options=$client_options
}

dataset=$1

if [[ "$dataset" == "default_osb_test_workloads" ]]; then
/usr/local/bin/docker-entrypoint.sh eswrapper & echo $! > /tmp/esWrapperProcess.pid && sleep 10 && generate_data_requests
elif [[ "$dataset" == "skip_dataset" ]]; then
echo "Skipping data generation step"
mkdir -p /usr/share/elasticsearch/data/nodes
else
echo "Unknown dataset provided: ${dataset}"
exit 1;
fi
12 changes: 12 additions & 0 deletions deployment/cdk/opensearch-service-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ With the [required setup](#importing-target-clusters) on the target cluster havi
The pipeline configuration file can be viewed (and updated) via AWS Secrets Manager.
Please note that it will be base64 encoded.

## Kicking off Reindex from Snapshot (RFS)

When the RFS service gets deployed, it does not start running immediately. Instead, the user controls when they want to kick off a historical data migration.

The following command can be run from the Migration Console to initiate the RFS historical data migration
```shell
aws ecs update-service --cluster migration-<STAGE>-ecs-cluster --service migration-<STAGE>-reindex-from-snapshot --desired-count 1
```

Currently, the RFS application will enter an idle state with the ECS container still running upon completion. This can be cleaned up by using the same command with `--desired-count 0`


## Monitoring Progress via Instrumentation

The replayer and capture proxy (if started with the `--otelCollectorEndpoint` argument) emit metrics through an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ script_dir_abs_path=$(dirname "$script_abs_path")
cd "$script_dir_abs_path" || exit

cd ../../../TrafficCapture || exit
./gradlew :dockerSolution:buildDockerImages -x test
./gradlew :dockerSolution:buildDockerImages -x test

cd ../RFS || exit
./gradlew buildDockerImages -x test
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './migration-otel-collector-stack'
export * from './migration-service-core'
export * from './opensearch-container-stack'
export * from './traffic-replayer-stack'
export * from './reindex-from-snapshot-stack'
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ export class MigrationConsoleStack extends MigrationServiceCore {
]
})

const allReplayerServiceArn = `arn:aws:ecs:${props.env?.region}:${props.env?.account}:service/migration-${props.stage}-ecs-cluster/migration-${props.stage}-traffic-replayer*`
const ecsClusterArn = `arn:aws:ecs:${props.env?.region}:${props.env?.account}:service/migration-${props.stage}-ecs-cluster`
const allReplayerServiceArn = `${ecsClusterArn}/migration-${props.stage}-traffic-replayer*`
const reindexFromSnapshotServiceArn = `${ecsClusterArn}/migration-${props.stage}-reindex-from-snapshot`
const updateReplayerServicePolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [allReplayerServiceArn],
resources: [allReplayerServiceArn, reindexFromSnapshotServiceArn],
actions: [
"ecs:UpdateService"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {StackPropsExt} from "../stack-composer";
import {IVpc, SecurityGroup} from "aws-cdk-lib/aws-ec2";
import {CpuArchitecture} from "aws-cdk-lib/aws-ecs";
import {Construct} from "constructs";
import {join} from "path";
import {MigrationServiceCore} from "./migration-service-core";
import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {
createOpenSearchIAMAccessPolicy,
createOpenSearchServerlessIAMAccessPolicy
} from "../common-utilities";


export interface ReindexFromSnapshotProps extends StackPropsExt {
readonly vpc: IVpc,
readonly sourceEndpoint: string,
readonly fargateCpuArch: CpuArchitecture,
readonly extraArgs?: string,
readonly otelCollectorEnabled?: boolean
}

export class ReindexFromSnapshotStack extends MigrationServiceCore {

constructor(scope: Construct, id: string, props: ReindexFromSnapshotProps) {
super(scope, id, props)
let securityGroups = [
SecurityGroup.fromSecurityGroupId(this, "serviceConnectSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/serviceConnectSecurityGroupId`)),
SecurityGroup.fromSecurityGroupId(this, "defaultDomainAccessSG", StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osAccessSecurityGroupId`)),
]

const artifactS3Arn = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/artifactS3Arn`)
const artifactS3AnyObjectPath = `${artifactS3Arn}/*`
const artifactS3PublishPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [artifactS3Arn, artifactS3AnyObjectPath],
actions: [
"s3:*"
]
})

const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.region, this.account)
let servicePolicies = [artifactS3PublishPolicy, openSearchPolicy, openSearchServerlessPolicy]

const osClusterEndpoint = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osClusterEndpoint`)
const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${props.env?.region} --snapshot-name rfs-snapshot --min-replicas 1 --enable-persistent-run --lucene-dir '/lucene' --source-host ${props.sourceEndpoint} --target-host ${osClusterEndpoint} --source-version es_7_10 --target-version os_2_11`
rfsCommand = props.extraArgs ? rfsCommand.concat(` ${props.extraArgs}`) : rfsCommand

this.createService({
serviceName: 'reindex-from-snapshot',
taskInstanceCount: 0,
dockerDirectoryPath: join(__dirname, "../../../../../", "RFS/docker"),
dockerImageCommand: ['/bin/sh', '-c', rfsCommand],
securityGroups: securityGroups,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 1024,
taskMemoryLimitMiB: 4096,
...props
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {Application} from "@aws-cdk/aws-servicecatalogappregistry-alpha";
import {OpenSearchContainerStack} from "./service-stacks/opensearch-container-stack";
import {determineStreamingSourceType, StreamingSourceType} from "./streaming-source-type";
import {parseRemovalPolicy, validateFargateCpuArch} from "./common-utilities";
import {ReindexFromSnapshotStack} from "./service-stacks/reindex-from-snapshot-stack";

export interface StackPropsExt extends StackProps {
readonly stage: string,
Expand Down Expand Up @@ -183,6 +184,8 @@ export class StackComposer {
const sourceClusterEndpoint = this.getContextForType('sourceClusterEndpoint', 'string', defaultValues, contextJSON)
const osContainerServiceEnabled = this.getContextForType('osContainerServiceEnabled', 'boolean', defaultValues, contextJSON)
const otelCollectorEnabled = this.getContextForType('otelCollectorEnabled', 'boolean', defaultValues, contextJSON)
const reindexFromSnapshotServiceEnabled = this.getContextForType('reindexFromSnapshotServiceEnabled', 'boolean', defaultValues, contextJSON)
const reindexFromSnapshotExtraArgs = this.getContextForType('reindexFromSnapshotExtraArgs', 'string', defaultValues, contextJSON)

const requiredFields: { [key: string]: any; } = {"stage":stage, "domainName":domainName}
for (let key in requiredFields) {
Expand Down Expand Up @@ -377,7 +380,6 @@ export class StackComposer {
this.stacks.push(kafkaBrokerStack)
}

// Currently, placing a requirement on a VPC for a fetch migration stack but this can be revisited
let fetchMigrationStack
if (fetchMigrationEnabled && networkStack && migrationStack) {
fetchMigrationStack = new FetchMigrationStack(scope, "fetchMigrationStack", {
Expand All @@ -395,6 +397,23 @@ export class StackComposer {
this.stacks.push(fetchMigrationStack)
}

let reindexFromSnapshotStack
if (reindexFromSnapshotServiceEnabled && networkStack && migrationStack) {
reindexFromSnapshotStack = new ReindexFromSnapshotStack(scope, "reindexFromSnapshotStack", {
vpc: networkStack.vpc,
sourceEndpoint: sourceClusterEndpoint,
extraArgs: reindexFromSnapshotExtraArgs,
stackName: `OSMigrations-${stage}-${region}-ReindexFromSnapshot`,
description: "This stack contains resources to assist migrating historical data, via Reindex from Snapshot, to a target cluster",
stage: stage,
defaultDeployId: defaultDeployId,
fargateCpuArch: fargateCpuArch,
...props,
})
this.addDependentStacks(reindexFromSnapshotStack, [migrationStack, openSearchStack, osContainerStack])
this.stacks.push(reindexFromSnapshotStack)
}

let captureProxyESStack
if (captureProxyESServiceEnabled && networkStack && migrationStack) {
captureProxyESStack = new CaptureProxyESStack(scope, "capture-proxy-es", {
Expand Down
7 changes: 7 additions & 0 deletions deployment/cdk/opensearch-service-migration/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ These tables list all CDK context configuration values a user can specify for th
| sourceClusterEndpoint | string | `"https://source-cluster.elb.us-east-1.endpoint.com"` | The endpoint for the source cluster from which Fetch Migration will pull data. Required if `fetchMigrationEnabled` is set to `true` |
| dpPipelineTemplatePath | string | "path/to/config.yaml" | Path to a local Data Prepper pipeline configuration YAML file that Fetch Migration will use to derive source and target cluster endpoints and other settings. Default value is the included template file i.e. [dp_pipeline_template.yaml](dp_pipeline_template.yaml) |

### Reindex from Snapshot (RFS) Service Options
| Name | Type | Example | Description |
|-----------------------------------|---------|-------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------|
| reindexFromSnapshotServiceEnabled | boolean | true | Create resources for deploying and configuring the RFS ECS service |
| reindexFromSnapshotExtraArgs | string | "--log-level warn" | Extra arguments to provide to the RFS command. This includes all parameters supported by [RFS](../../../RFS/src/main/java/com/rfs/ReindexFromSnapshot.java). |
| sourceClusterEndpoint | string | `"https://source-cluster.elb.us-east-1.endpoint.com"` | The endpoint for the source cluster from which RFS will take a snapshot |

### OpenSearch Domain Options
| Name | Type | Example | Description |
|-------------------------------------------------|--------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand Down
Loading