diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index b6daa35ee..375127c3f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -6,6 +6,7 @@ import console_link.logic.backfill as logic_backfill import console_link.logic.snapshot as logic_snapshot import console_link.logic.metadata as logic_metadata +import console_link.logic.replay as logic_replay from console_link.models.utils import ExitCode from console_link.environment import Environment @@ -237,7 +238,61 @@ def status_backfill_cmd(ctx): raise click.ClickException(message) click.echo(message) -# ##################### METRICS ################### + +# ##################### REPLAY ################### + +@cli.group(name="replay") +@click.pass_obj +def replay_group(ctx): + """All actions related to replaying data""" + if ctx.env.replay is None: + raise click.UsageError("Replay is not set") + + +@replay_group.command(name="describe") +@click.pass_obj +def describe_replay_cmd(ctx): + click.echo(logic_replay.describe(ctx.env.replay, as_json=ctx.json)) + + +@replay_group.command(name="start") +@click.pass_obj +def start_replay_cmd(ctx): + exitcode, message = logic_replay.start(ctx.env.replay) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) + + +@replay_group.command(name="stop") +@click.pass_obj +def stop_replay_cmd(ctx): + exitcode, message = logic_replay.stop(ctx.env.replay) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) + + +@replay_group.command(name="scale") +@click.argument("units", type=int, required=True) +@click.pass_obj +def scale_replay_cmd(ctx, units: int): + exitcode, message = logic_replay.scale(ctx.env.replay, units) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) + + +@replay_group.command(name="status") +@click.pass_obj +def status_replay_cmd(ctx): + exitcode, message = logic_replay.status(ctx.env.replay) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) + + +# ##################### METADATA ################### @cli.group(name="metadata") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index 96ebae57d..b6ccf5518 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -6,6 +6,8 @@ from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot +from console_link.models.replayer_base import Replayer +from console_link.models.replayer_ecs import ECSReplayer import yaml from cerberus import Validator @@ -20,6 +22,12 @@ def get_snapshot(config: Dict, source_cluster: Cluster): return S3Snapshot(config, source_cluster) +def get_replayer(config: Dict): + if 'ecs' in config: + return ECSReplayer(config) + raise ValueError("Invalid replayer config") + + SCHEMA = { "source_cluster": {"type": "dict", "required": False}, "target_cluster": {"type": "dict", "required": True}, @@ -27,7 +35,8 @@ def get_snapshot(config: Dict, source_cluster: Cluster): "backfill": {"type": "dict", "required": False}, "metrics_source": {"type": "dict", "required": False}, "snapshot": {"type": "dict", "required": False}, - "metadata_migration": {"type": "dict", "required": False} + "metadata_migration": {"type": "dict", "required": False}, + "replay": {"type": "dict", "required": False} } @@ -38,6 +47,7 @@ class Environment: metrics_source: Optional[MetricsSource] = None snapshot: Optional[Snapshot] = None metadata: Optional[Metadata] = None + replay: Optional[Replayer] = None def __init__(self, config_file: str): logger.info(f"Loading config file: {config_file}") @@ -77,6 +87,10 @@ def __init__(self, config_file: str): else: logger.info("No backfill provided") + if 'replay' in self.config: + self.replay: Replayer = get_replayer(self.config["replay"]) + logger.info(f"Replay initialized: {self.replay}") + if 'snapshot' in self.config: self.snapshot: Snapshot = get_snapshot(self.config["snapshot"], source_cluster=self.source_cluster) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py new file mode 100644 index 000000000..714f3cdc9 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py @@ -0,0 +1,76 @@ +import json +import logging +from typing import Tuple +from console_link.models.utils import ExitCode +from console_link.models.replayer_base import Replayer +import yaml + + +logger = logging.getLogger(__name__) + + +def describe(replayer: Replayer, as_json=False) -> str: + response = replayer.describe() + if as_json: + return json.dumps(response) + return yaml.safe_dump(response) + + +def start(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]: + try: + result = replayer.start(*args, **kwargs) + except NotImplementedError: + logger.error(f"Start is not implemented for replayer {type(replayer).__name__}") + return ExitCode.FAILURE, f"Start is not implemented for replayer {type(replayer).__name__}" + except Exception as e: + logger.error(f"Failed to start replayer: {e}") + return ExitCode.FAILURE, f"Failure when starting replayer: {type(e).__name__} {e}" + + if result.success: + return ExitCode.SUCCESS, "Replayer started successfully." + "\n" + result.display() + return ExitCode.FAILURE, "Replayer start failed." + "\n" + result.display() + + +def stop(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]: + logger.info("Stopping replayer") + try: + result = replayer.stop(*args, **kwargs) + except NotImplementedError: + logger.error(f"Stop is not implemented for replayer {type(replayer).__name__}") + return ExitCode.FAILURE, f"Stop is not implemented for replayer {type(replayer).__name__}" + except Exception as e: + logger.error(f"Failed to stop replayer: {e}") + return ExitCode.FAILURE, f"Failure when stopping replayer: {type(e).__name__} {e}" + if result.success: + return ExitCode.SUCCESS, "Replayer stopped successfully." + "\n" + result.display() + return ExitCode.FAILURE, "Replayer stop failed." + "\n" + result.display() + + +def scale(replayer: Replayer, units: int, *args, **kwargs) -> Tuple[ExitCode, str]: + logger.info(f"Scaling replayer to {units} units") + try: + result = replayer.scale(units, *args, **kwargs) + except NotImplementedError: + logger.error(f"Scale is not implemented for replayer {type(replayer).__name__}") + return ExitCode.FAILURE, f"Scale is not implemented for replayer {type(replayer).__name__}" + except Exception as e: + logger.error(f"Failed to scale replayer: {e}") + return ExitCode.FAILURE, f"Failure when scaling replayer: {type(e).__name__} {e}" + if result.success: + return ExitCode.SUCCESS, "Replayer scaled successfully." + "\n" + result.display() + return ExitCode.FAILURE, "Replayer scale failed." + "\n" + result.display() + + +def status(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]: + logger.info("Getting replayer status") + try: + status = replayer.get_status(*args, **kwargs) + except NotImplementedError: + logger.error(f"Status is not implemented for replayer {type(replayer).__name__}") + return ExitCode.FAILURE, f"Status is not implemented for replayer: {type(replayer).__name__}" + except Exception as e: + logger.error(f"Failed to get status of replayer: {e}") + return ExitCode.FAILURE, f"Failure when getting status of replayer: {type(e).__name__} {e}" + if status: + return ExitCode.SUCCESS, status.value + return ExitCode.FAILURE, "Replayer status retrieval failed." + "\n" + status diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py index bacaef1b1..fc3296011 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py @@ -92,3 +92,7 @@ def start(self, *args, **kwargs) -> CommandResult: def stop(self, *args, **kwargs) -> CommandResult: logger.info("Stopping RFS backfill by setting desired count to 0 instances") return self.ecs_client.set_desired_count(0) + + def scale(self, units: int, *args, **kwargs) -> CommandResult: + logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") + return self.ecs_client.set_desired_count(units) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer.py deleted file mode 100644 index ede125ead..000000000 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer.py +++ /dev/null @@ -1,82 +0,0 @@ -from enum import Enum -from typing import Dict -import boto3 -from cerberus import Validator - -DeploymentType = Enum('DeploymentType', ['DOCKER', 'ECS']) - -SCHEMA = { - 'deployment_type': { - 'type': 'string', - 'required': True - } -} - - -class BaseReplayer(): - """ - The Replayer base class - """ - def __init__(self, config: Dict) -> None: - self.config = config - v = Validator(SCHEMA) - if not v.validate(config): - raise ValueError("Invalid config file for replayer", v.errors) - - def start_replayer(self): - """ - Starts the replayer. - """ - raise NotImplementedError - - def stop_replayer(self): - """ - Stops the replayer. - """ - raise NotImplementedError - - -class LocalReplayer(BaseReplayer): - def start_replayer(self): - pass - - def stop_replayer(self): - pass - - -ECS_SCHEMA = { - 'cluster_name': { - 'type': 'string', - 'required': True - }, - 'service_name': { - 'type': 'string', - 'required': True - } -} - - -class ECSReplayer(BaseReplayer): - client = boto3.client('ecs') - - def __init__(self, config: Dict) -> None: - super().__init__(config) - v = Validator(ECS_SCHEMA) - if not v.validate(config): - raise ValueError("Invalid config file for replayer", v.errors) - self.cluster_name = config['cluster_name'] - self.service_name = config['service_name'] - - def start_replayer(self) -> None: - self.client.update_service( - cluster=self.cluster_name, - service=self.service_name, - desiredCount=1 - ) - - def stop_replayer(self) -> None: - self.client.update_service( - cluster=self.cluster_name, - service=self.service_name, - desiredCount=0 - ) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py new file mode 100644 index 000000000..5b78ec6a5 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py @@ -0,0 +1,74 @@ +from enum import Enum +from typing import Dict +from abc import ABC, abstractmethod + +from console_link.models.schema_tools import contains_one_of +from console_link.models.command_result import CommandResult + +from cerberus import Validator + +DOCKER_REPLAY_SCHEMA = { + "type": "dict", + "schema": { + "socket": {"type": "string", "required": False} + } +} + +ECS_REPLAY_SCHEMA = { + "type": "dict", + "schema": { + "cluster_name": {"type": "string", "required": True}, + "service_name": {"type": "string", "required": True}, + "aws_region": {"type": "string", "required": False} + } +} + +SCHEMA = { + "replay": { + "type": "dict", + "schema": { + "docker": DOCKER_REPLAY_SCHEMA, + "ecs": ECS_REPLAY_SCHEMA, + "scale": {"type": "integer", "required": False, "min": 1} + }, + "check_with": contains_one_of({"docker", "ecs"}) + } +} + + +ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"]) + + +class Replayer(ABC): + """ + Interface for replaying data from kafka to a target cluster. + """ + def __init__(self, config: Dict) -> None: + v = Validator(SCHEMA) + self.config = config + if not v.validate({"replay": self.config}): + raise ValueError("Invalid config file for replay", v.errors) + self.default_scale = self.config.get("scale", 1) + + @abstractmethod + def start(self, *args, **kwargs) -> CommandResult: + """Begin running the replayer. After running start, the user should be able to assume that--barring exceptions + or failures--their data will begin playing against the target cluster.""" + pass + + @abstractmethod + def stop(self, *args, **kwargs) -> CommandResult: + """Stop or pause the replay. This does not make guarantees about resumeability.""" + pass + + @abstractmethod + def get_status(self, *args, **kwargs) -> ReplayStatus: + """Return a status""" + pass + + @abstractmethod + def scale(self, units: int, *args, **kwargs) -> CommandResult: + pass + + def describe(self) -> Dict: + return self.config diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_ecs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_ecs.py new file mode 100644 index 000000000..2b5216abc --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_ecs.py @@ -0,0 +1,31 @@ +from typing import Dict +from console_link.models.command_result import CommandResult +from console_link.models.ecs_service import ECSService +from console_link.models.replayer_base import Replayer, ReplayStatus + +import logging + +logger = logging.getLogger(__name__) + + +class ECSReplayer(Replayer): + def __init__(self, config: Dict) -> None: + super().__init__(config) + self.ecs_config = self.config["ecs"] + self.ecs_client = ECSService(self.ecs_config["cluster_name"], self.ecs_config["service_name"], + self.ecs_config.get("aws_region", None)) + + def start(self, *args, **kwargs) -> CommandResult: + logger.info(f"Starting ECS replayer by setting desired count to {self.default_scale} instances") + return self.ecs_client.set_desired_count(self.default_scale) + + def stop(self, *args, **kwargs) -> CommandResult: + logger.info("Stopping ECS replayer by setting desired count to 0 instances") + return self.ecs_client.set_desired_count(0) + + def get_status(self, *args, **kwargs) -> ReplayStatus: + raise NotImplementedError() + + def scale(self, units: int, *args, **kwargs) -> CommandResult: + logger.info(f"Scaling ECS replayer by setting desired count to {units} instances") + return self.ecs_client.set_desired_count(units) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml index 4e45ddd5c..104999821 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml @@ -27,3 +27,8 @@ metadata_migration: repo_uri: "s3://my-snapshot-bucket" aws_region: "us-east-2" min_replicas: 0 +replay: + ecs: + cluster_name: "my-cluster" + service_name: "my-service" + scale: 2 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py index e85ef8681..14d4e55c0 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_backfill.py @@ -173,3 +173,11 @@ def test_ecs_rfs_backfill_stop_sets_ecs_desired_count(ecs_rfs_backfill, mocker): assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 0) + + +def test_ecs_rfs_backfill_scale_sets_ecs_desired_count(ecs_rfs_backfill, mocker): + mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) + ecs_rfs_backfill.scale(3) + + assert isinstance(ecs_rfs_backfill, ECSRFSBackfill) + mock.assert_called_once_with(ecs_rfs_backfill.ecs_client, 3) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_replay.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_replay.py new file mode 100644 index 000000000..bad6ec646 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_replay.py @@ -0,0 +1,87 @@ +import pathlib + +import pytest + +from console_link.environment import get_replayer +from console_link.models.replayer_base import Replayer +from console_link.models.replayer_ecs import ECSReplayer +from console_link.models.ecs_service import ECSService + +TEST_DATA_DIRECTORY = pathlib.Path(__file__).parent / "data" +AWS_REGION = "us-east-1" + + +def test_get_replayer_valid_ecs(): + ecs_config = { + "ecs": { + "cluster_name": "migration-aws-integ-ecs-cluster", + "service_name": "migration-aws-integ-traffic-replayer-default" + }, + "scale": 3 + } + replayer = get_replayer(ecs_config) + assert isinstance(replayer, ECSReplayer) + assert isinstance(replayer, Replayer) + + +def test_cant_instantiate_with_multiple_types(): + config = { + "ecs": { + "cluster_name": "migration-aws-integ-ecs-cluster", + "service_name": "migration-aws-integ-traffic-replayer-default" + }, + "docker": { + }, + "scale": 3 + } + with pytest.raises(ValueError) as excinfo: + get_replayer(config) + assert "Invalid config file for replay" in str(excinfo.value.args[0]) + assert "More than one value is present" in str(excinfo.value.args[1]['replay'][0]) + + +def test_replayer_start_sets_ecs_desired_count(mocker): + config = { + "ecs": { + "cluster_name": "migration-aws-integ-ecs-cluster", + "service_name": "migration-aws-integ-traffic-replayer-default" + }, + "scale": 3 + } + replayer = get_replayer(config) + mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) + replayer.start() + + assert isinstance(replayer, ECSReplayer) + mock.assert_called_once_with(replayer.ecs_client, config["scale"]) + + +def test_replayer_stop_sets_ecs_desired_count(mocker): + config = { + "ecs": { + "cluster_name": "migration-aws-integ-ecs-cluster", + "service_name": "migration-aws-integ-traffic-replayer-default" + }, + "scale": 3 + } + replayer = get_replayer(config) + mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) + replayer.stop() + + assert isinstance(replayer, ECSReplayer) + mock.assert_called_once_with(replayer.ecs_client, 0) + + +def test_replayer_scale_sets_ecs_desired_count(mocker): + config = { + "ecs": { + "cluster_name": "migration-aws-integ-ecs-cluster", + "service_name": "migration-aws-integ-traffic-replayer-default" + } + } + replayer = get_replayer(config) + mock = mocker.patch.object(ECSService, 'set_desired_count', autospec=True) + replayer.scale(5) + + assert isinstance(replayer, ECSReplayer) + mock.assert_called_once_with(replayer.ecs_client, 5) diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts index 871fefc58..b1b6231fb 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts @@ -42,6 +42,22 @@ export class OSIBackfillYaml { } } +export class ECSReplayerYaml { + ecs: ECSService; + scale: number = 1; + + constructor() { + this.ecs = new ECSService(); + } + + toDict() { + return { + ecs: this.ecs, + scale: this.scale + }; + } +} + export class FileSystemSnapshotYaml { repo_path: string = ''; } @@ -81,6 +97,7 @@ export class ServicesYaml { backfill: RFSBackfillYaml | OSIBackfillYaml; snapshot?: SnapshotYaml; metadata_migration?: MetadataMigrationYaml; + replayer?: ECSReplayerYaml; stringify(): string { return yaml.stringify({ @@ -89,7 +106,8 @@ export class ServicesYaml { metrics_source: this.metrics_source, backfill: this.backfill?.toDict(), snapshot: this.snapshot?.toDict(), - metadata_migration: this.metadata_migration + metadata_migration: this.metadata_migration, + replay: this.replayer?.toDict() }, { 'nullStr': '' diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts index bb66faaf6..c6f775923 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/traffic-replayer-stack.ts @@ -15,6 +15,7 @@ import { import {StreamingSourceType} from "../streaming-source-type"; import { Duration } from "aws-cdk-lib"; import {OtelCollectorSidecar} from "./migration-otel-collector-sidecar"; +import { ECSReplayerYaml } from "../migration-services-yaml"; export interface TrafficReplayerProps extends StackPropsExt { @@ -31,6 +32,7 @@ export interface TrafficReplayerProps extends StackPropsExt { } export class TrafficReplayerStack extends MigrationServiceCore { + replayerYaml: ECSReplayerYaml; constructor(scope: Construct, id: string, props: TrafficReplayerProps) { super(scope, id, props) @@ -130,5 +132,9 @@ export class TrafficReplayerStack extends MigrationServiceCore { taskMemoryLimitMiB: 4096, ...props }); + + this.replayerYaml = new ECSReplayerYaml(); + this.replayerYaml.ecs.cluster_name = `migration-${props.stage}-ecs-cluster`; + this.replayerYaml.ecs.service_name = `migration-${props.stage}-traffic-replayer-${deployId}`; } } \ No newline at end of file diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index ac48ca9d8..2093aa416 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -479,6 +479,7 @@ export class StackComposer { this.addDependentStacks(trafficReplayerStack, [networkStack, migrationStack, mskUtilityStack, kafkaBrokerStack, openSearchStack, osContainerStack]) this.stacks.push(trafficReplayerStack) + servicesYaml.replayer = trafficReplayerStack.replayerYaml; } let elasticsearchStack diff --git a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts index c72808449..f4c994f5f 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts @@ -1,10 +1,9 @@ import {createStackComposer} from "./test-utils"; import {Capture, Match, Template} from "aws-cdk-lib/assertions"; import {MigrationConsoleStack} from "../lib/service-stacks/migration-console-stack"; -import * as yaml from 'yaml'; - test('Test that IAM policy contains fetch migration IAM statements when fetch migration is enabled', () => { + jest.clearAllMocks const contextOptions = { vpcEnabled: true, migrationAssistanceEnabled: true, @@ -56,40 +55,3 @@ test('Test that IAM policy does not contain fetch migration IAM statements when expect(runTaskStatement).toBeFalsy() expect(iamPassRoleStatement).toBeFalsy() }) - -test('Test that services yaml parameter is created', () => { - const contextOptions = { - vpcEnabled: true, - migrationAssistanceEnabled: true, - migrationConsoleServiceEnabled: true, - sourceClusterEndpoint: "https://test-cluster", - reindexFromSnapshotServiceEnabled: true, - } - - const stacks = createStackComposer(contextOptions) - - const migrationConsoleStack: MigrationConsoleStack = (stacks.stacks.filter((s) => s instanceof MigrationConsoleStack)[0]) as MigrationConsoleStack - const migrationConsoleStackTemplate = Template.fromStack(migrationConsoleStack) - - const valueCapture = new Capture(); - migrationConsoleStackTemplate.hasResourceProperties("AWS::SSM::Parameter", { - Type: "String", - Name: Match.stringLikeRegexp("/migration/.*/.*/servicesYamlFile"), - Value: valueCapture, - }); - const value = valueCapture.asObject() - expect(value).toBeDefined(); - expect(value['Fn::Join']).toBeInstanceOf(Array); - expect(value['Fn::Join'][1]).toBeInstanceOf(Array) - // join the strings together to get the yaml file contents - const yamlFileContents = value['Fn::Join'][1].join('') - expect(yamlFileContents).toContain('source_cluster') - expect(yamlFileContents).toContain('target_cluster') - expect(yamlFileContents).toContain('metrics_source:\n cloudwatch:') - // Validates that the file can be parsed as valid yaml and has the expected fields - const parsedFromYaml = yaml.parse(yamlFileContents); - // Validates that the file has the expected fields - const expectedFields = ['source_cluster', 'target_cluster', 'metrics_source', 'backfill', 'snapshot', 'metadata_migration']; - expect(Object.keys(parsedFromYaml).length).toEqual(expectedFields.length) - expect(new Set(Object.keys(parsedFromYaml))).toEqual(new Set(expectedFields)) -}) \ No newline at end of file diff --git a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts index 0eb70d25f..a48ae447d 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts @@ -1,4 +1,15 @@ +import { ContainerImage } from "aws-cdk-lib/aws-ecs"; import { ClusterYaml, RFSBackfillYaml, ServicesYaml, SnapshotYaml } from "../lib/migration-services-yaml" +import { Template, Capture, Match } from "aws-cdk-lib/assertions"; +import { MigrationConsoleStack } from "../lib/service-stacks/migration-console-stack"; +import { createStackComposer } from "./test-utils"; +import * as yaml from 'yaml'; + + +// Mock using local Dockerfile (which may not exist and would fail synthesis) +jest.mock("aws-cdk-lib/aws-ecr-assets") +jest.spyOn(ContainerImage, 'fromDockerImageAsset').mockImplementation(() => ContainerImage.fromRegistry("ServiceImage")); + test('Test default servicesYaml can be stringified', () => { const servicesYaml = new ServicesYaml(); @@ -78,3 +89,41 @@ test('Test SnapshotYaml for s3 only includes s3', () => { expect(s3SnapshotDict["s3"]).toHaveProperty("repo_uri"); expect(s3SnapshotDict).not.toHaveProperty("fs"); }) + +test('Test that services yaml parameter is created by mgiration console stack', () => { + const contextOptions = { + vpcEnabled: true, + migrationAssistanceEnabled: true, + migrationConsoleServiceEnabled: true, + sourceClusterEndpoint: "https://test-cluster", + reindexFromSnapshotServiceEnabled: true, + trafficReplayerServiceEnabled: true + } + + const stacks = createStackComposer(contextOptions) + + const migrationConsoleStack: MigrationConsoleStack = (stacks.stacks.filter((s) => s instanceof MigrationConsoleStack)[0]) as MigrationConsoleStack + const migrationConsoleStackTemplate = Template.fromStack(migrationConsoleStack) + + const valueCapture = new Capture(); + migrationConsoleStackTemplate.hasResourceProperties("AWS::SSM::Parameter", { + Type: "String", + Name: Match.stringLikeRegexp("/migration/.*/.*/servicesYamlFile"), + Value: valueCapture, + }); + const value = valueCapture.asObject() + expect(value).toBeDefined(); + expect(value['Fn::Join']).toBeInstanceOf(Array); + expect(value['Fn::Join'][1]).toBeInstanceOf(Array) + // join the strings together to get the yaml file contents + const yamlFileContents = value['Fn::Join'][1].join('') + expect(yamlFileContents).toContain('source_cluster') + expect(yamlFileContents).toContain('target_cluster') + expect(yamlFileContents).toContain('metrics_source:\n cloudwatch:') + // Validates that the file can be parsed as valid yaml and has the expected fields + const parsedFromYaml = yaml.parse(yamlFileContents); + // Validates that the file has the expected fields + const expectedFields = ['source_cluster', 'target_cluster', 'metrics_source', 'backfill', 'snapshot', 'metadata_migration', 'replay']; + expect(Object.keys(parsedFromYaml).length).toEqual(expectedFields.length) + expect(new Set(Object.keys(parsedFromYaml))).toEqual(new Set(expectedFields)) +}) \ No newline at end of file