From e0ec666674843fa652f304f933f4fa9e8a73c516 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 20 Jun 2024 00:00:36 -0500 Subject: [PATCH 1/5] Add initial migration console snapshot support Signed-off-by: Andre Kurait --- .../src/main/java/com/rfs/CreateSnapshot.java | 8 +- .../com/rfs/common/ConnectionDetails.java | 6 ++ .../main/java/com/rfs/common/RestClient.java | 48 ++++++++--- .../lib/console_link/console_link/cli.py | 34 +++++++- .../console_link/console_link/environment.py | 11 +++ .../console_link/logic/backfill.py | 7 +- .../console_link/logic/snapshot.py | 28 ++++++ .../console_link/models/snapshot.py | 85 +++++++++++++++++++ .../console_link/console_link/models/utils.py | 5 ++ .../lib/console_link/services.yaml | 6 +- .../lib/console_link/tests/test_cli.py | 31 +++++++ 11 files changed, 249 insertions(+), 20 deletions(-) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index 5b2b72330..38865b0ca 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -48,6 +48,9 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; + + @Parameter(names = {"--insecure"}, description = "Allow untrusted SSL certificates", required = false) + public boolean insecure = false; } public static void main(String[] args) throws Exception { @@ -67,9 +70,10 @@ public static void main(String[] args) throws Exception { final String targetHost = arguments.targetHost; final String targetUser = arguments.targetUser; final String targetPass = arguments.targetPass; + final boolean insecure = arguments.insecure; - final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); - final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, insecure); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, insecure); TryHandlePhaseFailure.executeWithTryCatch(() -> { log.info("Running RfsWorker"); diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index 4e2edb7b9..54a0d9715 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -24,9 +24,15 @@ public static enum Protocol { public final String username; public final String password; public final AuthType authType; + public final boolean insecure; public ConnectionDetails(String url, String username, String password) { + this(url, username, password, false); + } + + public ConnectionDetails(String url, String username, String password, boolean insecure) { this.url = url; // http://localhost:9200 + this.insecure = insecure; // If the username is provided, the password must be as well, and vice versa if ((username == null && password != null) || (username != null && password == null)) { diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index f25e666d3..a299c9ea8 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -2,9 +2,16 @@ import java.util.Base64; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import lombok.SneakyThrows; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.SslProvider; import reactor.netty.ByteBufMono; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; public class RestClient { public static class Response { @@ -22,20 +29,41 @@ public Response(int responseCode, String responseBody, String responseMessage) { public final ConnectionDetails connectionDetails; private final HttpClient client; + @SneakyThrows public RestClient(ConnectionDetails connectionDetails) { this.connectionDetails = connectionDetails; + SslProvider sslProvider; + if (connectionDetails.insecure) { + SslContext sslContext = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + + sslProvider = SslProvider.builder() + .sslContext(sslContext) + .handlerConfigurator(sslHandler -> { + SSLEngine engine = sslHandler.engine(); + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm(null); + engine.setSSLParameters(sslParameters); + }) + .build(); + } else { + sslProvider = SslProvider.defaultClientProvider(); + } + this.client = HttpClient.create() - .baseUrl(connectionDetails.url) - .headers(h -> { - h.add("Content-Type", "application/json"); - h.add("User-Agent", "RfsWorker-1.0"); - if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { - String credentials = connectionDetails.username + ":" + connectionDetails.password; - String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); - h.add("Authorization", "Basic " + encodedCredentials); - } - }); + .secure(sslProvider) + .baseUrl(connectionDetails.url) + .headers(h -> { + h.add("Content-Type", "application/json"); + h.add("User-Agent", "RfsWorker-1.0"); + if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = connectionDetails.username + ":" + connectionDetails.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.add("Authorization", "Basic " + encodedCredentials); + } + }); } public Mono getAsync(String path) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 281970489..b0826a10a 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -4,9 +4,13 @@ import console_link.logic.clusters as logic_clusters import console_link.logic.metrics as logic_metrics import console_link.logic.backfill as logic_backfill -from console_link.logic.backfill import ExitCode +import console_link.logic.snapshot as logic_snapshot + +from console_link.models.utils import ExitCode from console_link.environment import Environment from console_link.models.metrics_source import Component, MetricStatistic +from console_link.models.snapshot import SnapshotStatus + import logging logger = logging.getLogger(__name__) @@ -88,6 +92,34 @@ def replayer_group(ctx): def start_replayer_cmd(ctx): ctx.env.replayer.start() +# ##################### SNAPSHOT ################### + +@cli.group(name="snapshot") +@click.pass_obj +def snapshot_group(ctx): + """All actions related to snapshot creation""" + if ctx.env.snapshot is None: + raise click.UsageError("Snapshot is not set") + + +@snapshot_group.command(name="create") +@click.pass_obj +def create_snapshot_cmd(ctx): + """Create a snapshot of the source cluster""" + snapshot = ctx.env.snapshot + status, message = logic_snapshot.create(snapshot) + if status != SnapshotStatus.COMPLETED: + raise click.ClickException(message) + click.echo(message) + + +@snapshot_group.command(name="status") +@click.pass_obj +def status_snapshot_cmd(ctx): + """Check the status of the snapshot""" + snapshot = ctx.env.snapshot + _, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster, target_cluster=ctx.env.target_cluster) + click.echo(message) # ##################### BACKFILL ################### diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index cf3b85477..8caf9f228 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -5,6 +5,7 @@ from console_link.logic.metrics import get_metrics_source from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill +from console_link.models.snapshot import Snapshot, S3Snapshot import yaml from cerberus import Validator @@ -17,6 +18,7 @@ "replayer": {"type": "dict", "required": False}, "backfill": {"type": "dict", "required": False}, "metrics_source": {"type": "dict", "required": False}, + "snapshot": {"type": "dict", "required": False}, } @@ -25,6 +27,7 @@ class Environment: target_cluster: Optional[Cluster] = None backfill: Optional[Backfill] = None metrics_source: Optional[MetricsSource] = None + snapshot: Optional[Snapshot] = None def __init__(self, config_file: str): self.config_file = config_file @@ -61,3 +64,11 @@ def __init__(self, config_file: str): logger.info(f"Backfill migration initialized: {self.backfill}") else: logger.info("No backfill provided") + + if 'snapshot' in self.config: + self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) + logger.info(f"Snapshot initialized: {self.snapshot}") + else: + logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py index b7e34e012..e2fd9c66c 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py @@ -2,7 +2,7 @@ import json import logging from typing import Dict, Optional, Tuple - +from console_link.models.utils import ExitCode from console_link.models.backfill_osi import OpenSearchIngestionBackfill from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill from console_link.models.cluster import Cluster @@ -13,11 +13,6 @@ logger = logging.getLogger(__name__) -class ExitCode(Enum): - SUCCESS = 0 - FAILURE = 1 - - BackfillType = Enum("BackfillType", ["opensearch_ingestion", "reindex_from_snapshot"]) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py new file mode 100644 index 000000000..37ab68b26 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py @@ -0,0 +1,28 @@ +import logging +from typing import Tuple +from console_link.models.snapshot import Snapshot, SnapshotStatus + +logger = logging.getLogger(__name__) + +def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: + logger.info(f"Creating snapshot with {args=} and {kwargs=}") + try: + result = snapshot.create(*args, **kwargs) + except Exception as e: + logger.error(f"Failed to create snapshot: {e}") + return SnapshotStatus.FAILED, f"Failure when creating snapshot: {type(e).__name__} {e}" + + if result.success: + return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value + return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value + +def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: + logger.info("Getting snapshot status") + try: + result = snapshot.status(*args, **kwargs) + except Exception as e: + logger.error(f"Failed to get status of snapshot: {e}") + return SnapshotStatus.FAILED, f"Failure when getting status of snapshot: {type(e).__name__} {e}" + if result.success: + return SnapshotStatus.COMPLETED, result.value + return SnapshotStatus.FAILED, "Snapshot status retrieval failed." + "\n" + result.value diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py new file mode 100644 index 000000000..72ca5de6c --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -0,0 +1,85 @@ +from abc import ABC, abstractmethod +from enum import Enum +import logging +import subprocess +from typing import Dict, Optional, Tuple +from console_link.models.cluster import Cluster +from console_link.models.command_result import CommandResult +from cerberus import Validator + +logger = logging.getLogger(__name__) + +SnapshotStatus = Enum( + "SnapshotStatus", [ + "NOT_STARTED", + "RUNNING", + "COMPLETED", + "FAILED" + ]) + +class Snapshot(ABC): + """ + Interface for creating and managing snapshots. + """ + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + self.config = config + self.source_cluster = source_cluster + self.target_cluster = target_cluster + + @abstractmethod + def create(self, *args, **kwargs) -> CommandResult: + """Create a snapshot.""" + pass + + @abstractmethod + def status(self, *args, **kwargs) -> CommandResult: + """Get the status of the snapshot.""" + pass + + +S3_SNAPSHOT_SCHEMA = { + 'snapshot_name': { + 'type': 'string', + 'required': True + }, + 's3_repo_uri': { + 'type': 'string', + 'required': True + }, + 's3_region': { + 'type': 'string', + 'required': True + } +} + +class S3Snapshot(Snapshot): + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + super().__init__(config, source_cluster, target_cluster) + v = Validator(S3_SNAPSHOT_SCHEMA) + if not v.validate(config): + raise ValueError("Invalid config file for snapshot", v.errors) + self.snapshot_name = config['snapshot_name'] + self.s3_repo_uri = config['s3_repo_uri'] + self.s3_region = config['s3_region'] + + def create(self, *args, **kwargs) -> CommandResult: + command = [ + "/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", self.snapshot_name, + "--s3-repo-uri", self.s3_repo_uri, + "--s3-region", self.s3_region, + "--source-host", self.source_cluster.endpoint, + "--target-host", self.target_cluster.endpoint, + ] + if self.source_cluster.allow_insecure or self.target_cluster.allow_insecure: + command.append("--insecure") + logger.info(f"Creating snapshot with command: {' '.join(command)}") + try: + subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") + return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)} {e.output} {e.stderr}") + def status(self, *args, **kwargs) -> CommandResult: + return CommandResult(success=False, value=f"Command not implemented") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py index 7af59c925..873bd5d9b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py @@ -1,4 +1,5 @@ # define a custom exception for aws api errors +from enum import Enum from typing import Dict @@ -20,3 +21,7 @@ def raise_for_aws_api_error(response: Dict) -> None: "Non-2XX status code received", status_code=status_code ) + +class ExitCode(Enum): + SUCCESS = 0 + FAILURE = 1 \ No newline at end of file diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 85ed9d80a..521992379 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -1,5 +1,5 @@ source_cluster: - endpoint: "https://capture-proxy-es:9200" + endpoint: "https://capture-proxy:9200" allow_insecure: true basic_auth: username: "admin" @@ -16,3 +16,7 @@ metrics_source: backfill: reindex_from_snapshot: docker: +snapshot: + snapshot_name: "snapshot_2023_01_01" + s3_repo_uri: "s3://my-snapshot-bucket" + s3_region: "us-east-2" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index bb85e1a3f..a74b2dc9f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -1,4 +1,5 @@ import pathlib +from console_link.models.command_result import CommandResult import requests_mock @@ -66,6 +67,35 @@ def test_cli_with_backfill_describe(runner, env, mocker): assert result.exit_code == 0 +def test_cli_snapshot_create(runner, env, mocker): + mock_create = mocker.patch('console_link.models.snapshot.Snapshot.create') + + # Set the mock return values + mock_create.return_value = CommandResult(success=True, value="Snapshot created successfully") + + # Test snapshot creation + result_create = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'create'], catch_exceptions=True) + assert result_create.exit_code == 0 + assert "Snapshot created successfully" in result_create.output + + # Ensure the mocks were called + mock_create.assert_called_once() + +@pytest.mark.skip(reason="Not implemented yet") +def test_cli_snapshot_status(runner, env, mocker): + mock_status = mocker.patch('console_link.models.snapshot.Snapshot.status') + + mock_status.return_value = CommandResult(success=True, value="Snapshot status: COMPLETED") + + # Test snapshot status + result_status = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'status'], catch_exceptions=True) + assert result_status.exit_code == 0 + assert "Snapshot status: COMPLETED" in result_status.output + + # Ensure the mocks were called + mock_status.assert_called_once() + + source_cat_indices = """ green open logs-221998 pKNVNlhcRuuUXlwPJag9Kg 5 0 1000 0 167.4kb 167.4kb green open geonames DlT1Qp-7SqaARuECxTaYvw 5 0 1000 0 343.2kb 343.2kb @@ -95,3 +125,4 @@ def test_cli_cat_indices_e2e(runner, env): assert 'TARGET CLUSTER' in result.output assert source_cat_indices in result.output assert target_cat_indices in result.output + From c19a155c26db2ad85ef1c0a5ad019b7138666a33 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 20 Jun 2024 10:19:46 -0500 Subject: [PATCH 2/5] Send snapshot output while command is running Signed-off-by: Andre Kurait --- .../src/main/java/com/rfs/CreateSnapshot.java | 14 +++++++++----- .../console_link/console_link/models/snapshot.py | 14 ++++++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index 38865b0ca..51ebb508b 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -40,6 +40,9 @@ public static class Args { @Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false) public String sourcePass = null; + @Parameter(names = {"--source-insecure"}, description = "Allow untrusted SSL certificates for source", required = false) + public boolean sourceInsecure = false; + @Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true) public String targetHost; @@ -49,8 +52,8 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; - @Parameter(names = {"--insecure"}, description = "Allow untrusted SSL certificates", required = false) - public boolean insecure = false; + @Parameter(names = {"--target-insecure"}, description = "Allow untrusted SSL certificates for target", required = false) + public boolean targetInsecure = false; } public static void main(String[] args) throws Exception { @@ -70,10 +73,11 @@ public static void main(String[] args) throws Exception { final String targetHost = arguments.targetHost; final String targetUser = arguments.targetUser; final String targetPass = arguments.targetPass; - final boolean insecure = arguments.insecure; + final boolean sourceInsecure = arguments.sourceInsecure; + final boolean targetInsecure = arguments.targetInsecure; - final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, insecure); - final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, insecure); + final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, sourceInsecure); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure); TryHandlePhaseFailure.executeWithTryCatch(() -> { log.info("Running RfsWorker"); diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 72ca5de6c..930ecbb86 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -71,15 +71,21 @@ def create(self, *args, **kwargs) -> CommandResult: "--source-host", self.source_cluster.endpoint, "--target-host", self.target_cluster.endpoint, ] - if self.source_cluster.allow_insecure or self.target_cluster.allow_insecure: - command.append("--insecure") + + if self.source_cluster.allow_insecure: + command.append("--source-insecure") + if self.target_cluster.allow_insecure: + command.append("--target-insecure") + logger.info(f"Creating snapshot with command: {' '.join(command)}") try: - subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + # Pass None to stdout and stderr to not capture output and show in terminal + subprocess.run(command, stdout=None, stderr=None, text=True, check=True) logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") except subprocess.CalledProcessError as e: logger.error(f"Failed to create snapshot: {str(e)}") - return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)} {e.output} {e.stderr}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") + def status(self, *args, **kwargs) -> CommandResult: return CommandResult(success=False, value=f"Command not implemented") From 6a4764185c6d330b3d9aa0f3bb8faf46a8dcf1b0 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 20 Jun 2024 10:45:55 -0500 Subject: [PATCH 3/5] Fix test_cli.py for snapshot create Signed-off-by: Andre Kurait --- .../lib/console_link/console_link/cli.py | 5 ++- .../console_link/console_link/environment.py | 4 +-- .../console_link/logic/snapshot.py | 2 ++ .../console_link/models/snapshot.py | 9 ++--- .../console_link/console_link/models/utils.py | 3 +- .../lib/console_link/tests/data/services.yaml | 4 +++ .../lib/console_link/tests/test_cli.py | 36 ++++++++++--------- 7 files changed, 39 insertions(+), 24 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index b0826a10a..5d6952835 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -94,6 +94,7 @@ def start_replayer_cmd(ctx): # ##################### SNAPSHOT ################### + @cli.group(name="snapshot") @click.pass_obj def snapshot_group(ctx): @@ -118,7 +119,8 @@ def create_snapshot_cmd(ctx): def status_snapshot_cmd(ctx): """Check the status of the snapshot""" snapshot = ctx.env.snapshot - _, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster, target_cluster=ctx.env.target_cluster) + _, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster, + target_cluster=ctx.env.target_cluster) click.echo(message) # ##################### BACKFILL ################### @@ -126,6 +128,7 @@ def status_snapshot_cmd(ctx): # As we add other forms of backfill migrations, we should incorporate a way to dynamically allow different sets of # arguments depending on the type of backfill migration + @cli.group(name="backfill") @click.pass_obj def backfill_group(ctx): diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index 8caf9f228..d3b83b470 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -67,8 +67,8 @@ def __init__(self, config_file: str): if 'snapshot' in self.config: self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], - source_cluster=self.source_cluster, - target_cluster=self.target_cluster) + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py index 37ab68b26..70a680ff1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py @@ -4,6 +4,7 @@ logger = logging.getLogger(__name__) + def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: logger.info(f"Creating snapshot with {args=} and {kwargs=}") try: @@ -16,6 +17,7 @@ def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value + def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: logger.info("Getting snapshot status") try: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 930ecbb86..690ad7f7b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -2,7 +2,7 @@ from enum import Enum import logging import subprocess -from typing import Dict, Optional, Tuple +from typing import Dict, Optional from console_link.models.cluster import Cluster from console_link.models.command_result import CommandResult from cerberus import Validator @@ -14,8 +14,8 @@ "NOT_STARTED", "RUNNING", "COMPLETED", - "FAILED" - ]) + "FAILED"]) + class Snapshot(ABC): """ @@ -52,6 +52,7 @@ def status(self, *args, **kwargs) -> CommandResult: } } + class S3Snapshot(Snapshot): def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: super().__init__(config, source_cluster, target_cluster) @@ -88,4 +89,4 @@ def create(self, *args, **kwargs) -> CommandResult: return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") def status(self, *args, **kwargs) -> CommandResult: - return CommandResult(success=False, value=f"Command not implemented") + return CommandResult(success=False, value="Command not implemented") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py index 873bd5d9b..ea31fd3a1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py @@ -22,6 +22,7 @@ def raise_for_aws_api_error(response: Dict) -> None: status_code=status_code ) + class ExitCode(Enum): SUCCESS = 0 - FAILURE = 1 \ No newline at end of file + FAILURE = 1 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml index 85ed9d80a..abca9d89a 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml @@ -16,3 +16,7 @@ metrics_source: backfill: reindex_from_snapshot: docker: +snapshot: + snapshot_name: "test_snapshot" + s3_repo_uri: "s3://test-bucket" + s3_region: "us-east-2" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index a74b2dc9f..0b464a37c 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -1,5 +1,5 @@ import pathlib -from console_link.models.command_result import CommandResult +from console_link.models.snapshot import SnapshotStatus import requests_mock @@ -68,32 +68,37 @@ def test_cli_with_backfill_describe(runner, env, mocker): def test_cli_snapshot_create(runner, env, mocker): - mock_create = mocker.patch('console_link.models.snapshot.Snapshot.create') + mock = mocker.patch('console_link.logic.snapshot.create') - # Set the mock return values - mock_create.return_value = CommandResult(success=True, value="Snapshot created successfully") + # Set the mock return value + mock.return_value = SnapshotStatus.COMPLETED, "Snapshot created successfully." # Test snapshot creation - result_create = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'create'], catch_exceptions=True) - assert result_create.exit_code == 0 - assert "Snapshot created successfully" in result_create.output + result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'create'], + catch_exceptions=True) + + assert result.exit_code == 0 + assert "Snapshot created successfully" in result.output # Ensure the mocks were called - mock_create.assert_called_once() + mock.assert_called_once() + @pytest.mark.skip(reason="Not implemented yet") def test_cli_snapshot_status(runner, env, mocker): - mock_status = mocker.patch('console_link.models.snapshot.Snapshot.status') + mock = mocker.patch('console_link.logic.snapshot.status') + + # Set the mock return value + mock.return_value = SnapshotStatus.COMPLETED, "Snapshot status: COMPLETED" - mock_status.return_value = CommandResult(success=True, value="Snapshot status: COMPLETED") - # Test snapshot status - result_status = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'status'], catch_exceptions=True) - assert result_status.exit_code == 0 - assert "Snapshot status: COMPLETED" in result_status.output + result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'status'], + catch_exceptions=True) + assert result.exit_code == 0 + assert "Snapshot status: COMPLETED" in result.output # Ensure the mocks were called - mock_status.assert_called_once() + mock.assert_called_once() source_cat_indices = """ @@ -125,4 +130,3 @@ def test_cli_cat_indices_e2e(runner, env): assert 'TARGET CLUSTER' in result.output assert source_cat_indices in result.output assert target_cat_indices in result.output - From 6a31fab5a32c95a8c9bbd9dc7aee1b57da2fb2a2 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 20 Jun 2024 12:11:54 -0500 Subject: [PATCH 4/5] Add cdk for snapshot create Signed-off-by: Andre Kurait --- .../lib/migration-services-yaml.ts | 18 +++++++++++++++++- .../reindex-from-snapshot-stack.ts | 8 ++++++-- .../lib/stack-composer.ts | 1 + .../test/migration-console-stack.test.ts | 8 +++++--- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts index eb5e8d95c..514834ddd 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts @@ -42,18 +42,34 @@ export class OSIBackfillYaml { } } +export class S3SnapshotYaml { + snapshot_name: string = ''; + s3_repo_uri: string = ''; + s3_region: string = ''; + + toDict() { + return { + snapshot_name: this.snapshot_name, + s3_repo_uri: this.s3_repo_uri, + s3_region: this.s3_region + }; + } +} + export class ServicesYaml { source_cluster: ClusterYaml; target_cluster: ClusterYaml; metrics_source: MetricsSourceYaml = new MetricsSourceYaml(); backfill: RFSBackfillYaml | OSIBackfillYaml; + snapshot?: S3SnapshotYaml; stringify(): string { return yaml.stringify({ source_cluster: this.source_cluster, target_cluster: this.target_cluster, metrics_source: this.metrics_source, - backfill: this.backfill?.toDict() + backfill: this.backfill?.toDict(), + snapshot: this.snapshot?.toDict() }, { 'nullStr': '' diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts index 78f3ddcc7..4b2f934d4 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts @@ -11,7 +11,7 @@ import { createOpenSearchServerlessIAMAccessPolicy, getMigrationStringParameterValue } from "../common-utilities"; -import { RFSBackfillYaml } from "../migration-services-yaml"; +import { RFSBackfillYaml, S3SnapshotYaml } from "../migration-services-yaml"; export interface ReindexFromSnapshotProps extends StackPropsExt { @@ -23,6 +23,7 @@ export interface ReindexFromSnapshotProps extends StackPropsExt { export class ReindexFromSnapshotStack extends MigrationServiceCore { rfsBackfillYaml: RFSBackfillYaml; + rfsSnapshotYaml: S3SnapshotYaml; constructor(scope: Construct, id: string, props: ReindexFromSnapshotProps) { super(scope, id, props) @@ -84,7 +85,10 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { this.rfsBackfillYaml = new RFSBackfillYaml(); this.rfsBackfillYaml.ecs.cluster_name = `migration-${props.stage}-ecs-cluster`; this.rfsBackfillYaml.ecs.service_name = `migration-${props.stage}-reindex-from-snapshot`; - + this.rfsSnapshotYaml = new S3SnapshotYaml(); + this.rfsSnapshotYaml.s3_repo_uri = s3Uri; + this.rfsSnapshotYaml.s3_region = this.region; + this.rfsSnapshotYaml.snapshot_name = "rfs-snapshot"; } } diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index 88591d023..ac48ca9d8 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -434,6 +434,7 @@ export class StackComposer { this.addDependentStacks(reindexFromSnapshotStack, [migrationStack, openSearchStack, osContainerStack]) this.stacks.push(reindexFromSnapshotStack) servicesYaml.backfill = reindexFromSnapshotStack.rfsBackfillYaml; + servicesYaml.snapshot = reindexFromSnapshotStack.rfsSnapshotYaml; } diff --git a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts index 06e437638..7c5fac46e 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts @@ -63,6 +63,7 @@ test('Test that services yaml parameter is created', () => { migrationAssistanceEnabled: true, migrationConsoleServiceEnabled: true, sourceClusterEndpoint: "https://test-cluster", + reindexFromSnapshotServiceEnabled: true, } const stacks = createStackComposer(contextOptions) @@ -85,9 +86,10 @@ test('Test that services yaml parameter is created', () => { expect(yamlFileContents).toContain('source_cluster') expect(yamlFileContents).toContain('target_cluster') expect(yamlFileContents).toContain('metrics_source:\n cloudwatch:') - // Validates that the file can be parsed as valid yaml and has the expected fields const parsedFromYaml = yaml.parse(yamlFileContents); - expect(Object.keys(parsedFromYaml)).toEqual(['source_cluster', 'target_cluster', 'metrics_source']) - + // Validates that the file has the expected fields + const expectedFields = ['source_cluster', 'target_cluster', 'metrics_source', 'backfill', 'snapshot']; + expect(Object.keys(parsedFromYaml).length).toEqual(expectedFields.length) + expect(new Set(Object.keys(parsedFromYaml))).toEqual(new Set(expectedFields)) }) \ No newline at end of file From 11dc52402a82db6b4253bbc08e2878bf0ca7494b Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 20 Jun 2024 12:29:03 -0500 Subject: [PATCH 5/5] Fix cluster allow_insecure Signed-off-by: Andre Kurait --- .../lib/console_link/console_link/models/cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index ade376e45..4b4eff14e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -66,6 +66,7 @@ class Cluster: aws_secret_arn: Optional[str] = None auth_type: Optional[AuthMethod] = None auth_details: Optional[Dict[str, Any]] = None + allow_insecure: bool = None def __init__(self, config: Dict) -> None: logger.info(f"Initializing cluster with config: {config}") @@ -74,8 +75,7 @@ def __init__(self, config: Dict) -> None: raise ValueError("Invalid config file for cluster", v.errors) self.endpoint = config["endpoint"] - if self.endpoint.startswith("https"): - self.allow_insecure = config.get("allow_insecure", False) + self.allow_insecure = config.get("allow_insecure", False) if 'no_auth' in config: self.auth_type = AuthMethod.NO_AUTH elif 'basic_auth' in config: