Skip to content

Commit

Permalink
[Feature] Add Basic Support for OpenSearch Ingestion controls from th…
Browse files Browse the repository at this point in the history
…e Migration Console (#621)

This change introduces basic support for creating/starting/stopping an OpenSearch Ingestion pipeline.

It provides an experimental CDK option for enabling OSI on the migration console. This option does not prevent creating the OSI python tool on the Migration Console container, but rather controls creating the IAM roles and policies related to OSI

It sets up necessary IAM policies for the Migration Console to assume as a 'Manager" of the OSI pipeline. It also creates a pipeline role that will be assumed by OSI to communicate with the source and target clusters

Adds the osiMigration.py script to the migration console for controlling OSI pipelines. This script supports starting/stopping a pipeline as well as creating a pipeline from provided command arguments or creating the pipeline from inferred settings based on the associated deployment.

See README update for details on executing. Once this is merged, I expect to copy and paste something similar to the wiki.

Signed-off-by: Tanner Lewis <lewijacn@amazon.com>
  • Loading branch information
lewijacn authored May 1, 2024
1 parent 21fcdd5 commit 4beba00
Show file tree
Hide file tree
Showing 16 changed files with 578 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504 - Line break occurred after a binary operator
ignore = E265,E402,E999,W293,W504
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*

# F401 - Unused imports -- this is the only way to have a file-wide rule exception
per-file-ignores =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ FROM ubuntu:jammy

ENV DEBIAN_FRONTEND noninteractive

COPY lib /root/lib

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev openjdk-11-jre-headless wget gcc libc-dev git curl vim jq unzip less && \
pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli
pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli -r /root/lib/osiMigrationLib/requirements.txt
RUN mkdir /root/kafka-tools
RUN mkdir /root/kafka-tools/aws

Expand All @@ -25,6 +27,8 @@ COPY simpleDocumentGenerator.py /root/
COPY catIndices.sh /root/
COPY showFetchMigrationCommand.sh /root/
COPY setupIntegTests.sh /root/
COPY osiMigration.py /root/
COPY osiPipelineTemplate.yaml /root/
COPY msk-iam-auth.properties /root/kafka-tools/aws
COPY kafkaCmdRef.md /root/kafka-tools
COPY kafkaExport.sh /root/kafka-tools
Expand All @@ -34,6 +38,7 @@ RUN chmod ug+x /root/humanReadableLogs.py
RUN chmod ug+x /root/simpleDocumentGenerator.py
RUN chmod ug+x /root/catIndices.sh
RUN chmod ug+x /root/showFetchMigrationCommand.sh
RUN chmod ug+x /root/osiMigration.py
RUN chmod ug+x /root/kafka-tools/kafkaExport.sh

CMD tail -f /dev/null
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
coloredlogs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: "2"
# NOTE: Placeholder values will be automatically populated and do not need to be changed
<AWS_SECRET_CONFIG_PLACEHOLDER>

historical-data-migration:

# Source cluster configuration
source:
opensearch:
hosts:
- <SOURCE_CLUSTER_ENDPOINT_PLACEHOLDER>
indices:
# Indices to exclude - exclude system indices by default
exclude:
- index_name_regex: \.*
<SOURCE_AUTH_OPTIONS_PLACEHOLDER>

# Target cluster configuration
sink:
- opensearch:
hosts:
- <TARGET_CLUSTER_ENDPOINT_PLACEHOLDER>
# Derive index name from record metadata
index: ${getMetadata("opensearch-index")}
# Use the same document ID as the source cluster document
document_id: ${getMetadata("opensearch-document_id")}
<TARGET_AUTH_OPTIONS_PLACEHOLDER>
25 changes: 25 additions & 0 deletions deployment/cdk/opensearch-service-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,31 @@ With the [required setup](#importing-target-clusters) on the target cluster havi
The pipeline configuration file can be viewed (and updated) via AWS Secrets Manager.
Please note that it will be base64 encoded.

## Kicking off OpenSearch Ingestion Service

**Note**: Using OpenSearch Ingestion Service is currently an experimental feature that must be enabled with the `migrationConsoleEnableOSI` option. Currently only Managed OpenSearch service as a source to Managed OpenSearch service as a target migrations are supported

After enabling and deploying the CDK, log into the Migration Console
```shell
# ./accessContainer.sh migration-console STAGE REGION
./accessContainer.sh migration-console dev us-east-1
```
Make any modifications to the `osiPipelineTemplate.yaml` on the Migration Console, if needed. Note: Placeholder values exist in the file to automatically populate source/target endpoints and corresponding auth options by the python tool that uses this yaml file.

The OpenSearch Ingestion pipeline can then be created by giving an existing source cluster endpoint and running the below command
```shell
./osiMigration.py create-pipeline-from-solution --source-endpoint=<SOURCE_ENDPOINT>
```

When OpenSearch Ingestion pipelines are created they begin running immediately and can be stopped with the following command
```shell
./osiMigration.py stop-pipeline
```
Or restarted with the following command
```shell
./osiMigration.py start-pipeline
```

## Kicking off Reindex from Snapshot (RFS)

When the RFS service gets deployed, it does not start running immediately. Instead, the user controls when they want to kick off a historical data migration.
Expand Down
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ const customReplayerUserAgent = process.env.CUSTOM_REPLAYER_USER_AGENT
new StackComposer(app, {
migrationsAppRegistryARN: migrationsAppRegistryARN,
customReplayerUserAgent: customReplayerUserAgent,
migrationsSolutionVersion: version,
env: { account: account, region: region }
});
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class MSKUtilityStack extends Stack {
const lambdaInvokeStatement = new PolicyStatement({
effect: Effect.ALLOW,
actions: ["lambda:InvokeFunction"],
resources: [`arn:aws:lambda:${props.env?.region}:${props.env?.account}:function:OSMigrations*`]
resources: [`arn:aws:lambda:${this.region}:${this.account}:function:OSMigrations*`]
})
// Updating connectivity for an MSK cluster requires some VPC permissions
// (https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonmanagedstreamingforapachekafka.html#amazonmanagedstreamingforapachekafka-cluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ import {Construct} from "constructs";
import {join} from "path";
import {MigrationServiceCore} from "./migration-service-core";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam";
import {Effect, PolicyStatement, Role, ServicePrincipal} from "aws-cdk-lib/aws-iam";
import {
createOpenSearchIAMAccessPolicy,
createOpenSearchServerlessIAMAccessPolicy
} from "../common-utilities";
import {StreamingSourceType} from "../streaming-source-type";
import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs";
import {RemovalPolicy} from "aws-cdk-lib";


export interface MigrationConsoleProps extends StackPropsExt {
readonly migrationsSolutionVersion: string,
readonly vpc: IVpc,
readonly streamingSourceType: StreamingSourceType,
readonly fetchMigrationEnabled: boolean,
readonly fargateCpuArch: CpuArchitecture,
readonly otelCollectorEnabled: boolean
readonly otelCollectorEnabled: boolean,
readonly migrationConsoleEnableOSI: boolean
}

export class MigrationConsoleStack extends MigrationServiceCore {
Expand Down Expand Up @@ -52,6 +56,45 @@ export class MigrationConsoleStack extends MigrationServiceCore {
return [mskClusterAdminPolicy, mskTopicAdminPolicy, mskConsumerGroupAdminPolicy]
}

configureOpenSearchIngestionPipelineRole(stage: string, deployId: string) {
const osiPipelineRole = new Role(this, 'osisPipelineRole', {
assumedBy: new ServicePrincipal('osis-pipelines.amazonaws.com'),
description: 'OpenSearch Ingestion Pipeline role for OpenSearch Migrations'
});
// Add policy to allow access to Opensearch domains
osiPipelineRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ["es:DescribeDomain", "es:ESHttp*"],
resources: [`arn:aws:es:${this.region}:${this.account}:domain/*`]
}))

new StringParameter(this, 'SSMParameterOSIPipelineRoleArn', {
description: 'OpenSearch Migration Parameter for OpenSearch Ingestion Pipeline Role ARN',
parameterName: `/migration/${stage}/${deployId}/osiPipelineRoleArn`,
stringValue: osiPipelineRole.roleArn
});
return osiPipelineRole.roleArn
}

createOpenSearchIngestionManagementPolicy(pipelineRoleArn: string): PolicyStatement[] {
const allMigrationPipelineArn = `arn:aws:osis:${this.region}:${this.account}:pipeline/*`
const osiManagementPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [allMigrationPipelineArn],
actions: [
"osis:*"
]
})
const passPipelineRolePolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [pipelineRoleArn],
actions: [
"iam:PassRole"
]
})
return [osiManagementPolicy, passPipelineRolePolicy]
}

constructor(scope: Construct, id: string, props: MigrationConsoleProps) {
super(scope, id, props)
let securityGroups = [
Expand Down Expand Up @@ -81,7 +124,7 @@ export class MigrationConsoleStack extends MigrationServiceCore {
readOnly: false,
sourceVolume: volumeName
}
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${props.env?.region}:${props.env?.account}:file-system/${volumeId}`
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${this.region}:${this.account}:file-system/${volumeId}`
const replayerOutputMountPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: [replayerOutputEFSArn],
Expand All @@ -91,10 +134,10 @@ export class MigrationConsoleStack extends MigrationServiceCore {
]
})

const ecsClusterArn = `arn:aws:ecs:${props.env?.region}:${props.env?.account}:service/migration-${props.stage}-ecs-cluster`
const ecsClusterArn = `arn:aws:ecs:${this.region}:${this.account}:service/migration-${props.stage}-ecs-cluster`
const allReplayerServiceArn = `${ecsClusterArn}/migration-${props.stage}-traffic-replayer*`
const reindexFromSnapshotServiceArn = `${ecsClusterArn}/migration-${props.stage}-reindex-from-snapshot`
const updateReplayerServicePolicy = new PolicyStatement({
const ecsUpdateServicePolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [allReplayerServiceArn, reindexFromSnapshotServiceArn],
actions: [
Expand All @@ -112,14 +155,34 @@ export class MigrationConsoleStack extends MigrationServiceCore {
]
})

// Allow Console to determine proper subnets to use for any resource creation
const describeVPCPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: ["*"],
actions: [
"ec2:DescribeSubnets",
"ec2:DescribeRouteTables"
]
})

// Allow Console to retrieve SSM Parameters
const getSSMParamsPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:ssm:${this.region}:${this.account}:parameter/migration/${props.stage}/${props.defaultDeployId}/*`],
actions: [
"ssm:GetParameters"
]
})

const environment: { [key: string]: string; } = {
"MIGRATION_DOMAIN_ENDPOINT": osClusterEndpoint,
"MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints,
"MIGRATION_STAGE": props.stage
"MIGRATION_STAGE": props.stage,
"MIGRATION_SOLUTION_VERSION": props.migrationsSolutionVersion
}
const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.region, this.account)
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, updateReplayerServicePolicy, artifactS3PublishPolicy]
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy]
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId)
servicePolicies = servicePolicies.concat(mskAdminPolicies)
Expand Down Expand Up @@ -149,6 +212,21 @@ export class MigrationConsoleStack extends MigrationServiceCore {
servicePolicies.push(fetchMigrationPassRolePolicy)
}

if (props.migrationConsoleEnableOSI) {
const pipelineRoleArn = this.configureOpenSearchIngestionPipelineRole(props.stage, props.defaultDeployId)
servicePolicies.push(...this.createOpenSearchIngestionManagementPolicy(pipelineRoleArn))
const osiLogGroup = new LogGroup(this, 'OSILogGroup', {
retention: RetentionDays.ONE_MONTH,
removalPolicy: RemovalPolicy.DESTROY,
logGroupName: `/migration/${props.stage}/${props.defaultDeployId}/openSearchIngestion`
});
new StringParameter(this, 'SSMParameterOSIPipelineLogGroupName', {
description: 'OpenSearch Migration Parameter for OpenSearch Ingestion Pipeline Log Group Name',
parameterName: `/migration/${props.stage}/${props.defaultDeployId}/osiPipelineLogGroupName`,
stringValue: osiLogGroup.logGroupName
});
}

this.createService({
serviceName: "migration-console",
dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/migrationConsole"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class MigrationServiceCore extends Stack {
const namespace = PrivateDnsNamespace.fromPrivateDnsNamespaceAttributes(this, "PrivateDNSNamespace", {
namespaceName: `migration.${props.stage}.local`,
namespaceId: namespaceId,
namespaceArn: `arn:aws:servicediscovery:${props.env?.region}:${props.env?.account}:namespace/${namespaceId}`
namespaceArn: `arn:aws:servicediscovery:${this.region}:${this.account}:namespace/${namespaceId}`
})
cloudMapOptions = {
name: props.serviceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {

const osClusterEndpoint = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osClusterEndpoint`)
const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${props.env?.region} --snapshot-name rfs-snapshot --min-replicas 1 --enable-persistent-run --lucene-dir '/lucene' --source-host ${props.sourceEndpoint} --target-host ${osClusterEndpoint} --source-version es_7_10 --target-version os_2_11`
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${this.region} --snapshot-name rfs-snapshot --min-replicas 1 --enable-persistent-run --lucene-dir '/lucene' --source-host ${props.sourceEndpoint} --target-host ${osClusterEndpoint} --source-version es_7_10 --target-version os_2_11`
rfsCommand = props.extraArgs ? rfsCommand.concat(` ${props.extraArgs}`) : rfsCommand

this.createService({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class TrafficReplayerStack extends MigrationServiceCore {
readOnly: false,
sourceVolume: volumeName
}
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${props.env?.region}:${props.env?.account}:file-system/${volumeId}`
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${this.region}:${this.account}:file-system/${volumeId}`
const replayerOutputMountPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: [replayerOutputEFSArn],
Expand Down
Loading

0 comments on commit 4beba00

Please sign in to comment.