diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index 5b2b72330f..38865b0caa 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -48,6 +48,9 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; + + @Parameter(names = {"--insecure"}, description = "Allow untrusted SSL certificates", required = false) + public boolean insecure = false; } public static void main(String[] args) throws Exception { @@ -67,9 +70,10 @@ public static void main(String[] args) throws Exception { final String targetHost = arguments.targetHost; final String targetUser = arguments.targetUser; final String targetPass = arguments.targetPass; + final boolean insecure = arguments.insecure; - final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); - final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, insecure); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, insecure); TryHandlePhaseFailure.executeWithTryCatch(() -> { log.info("Running RfsWorker"); diff --git a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java index 4e2edb7b9e..54a0d97151 100644 --- a/RFS/src/main/java/com/rfs/common/ConnectionDetails.java +++ b/RFS/src/main/java/com/rfs/common/ConnectionDetails.java @@ -24,9 +24,15 @@ public static enum Protocol { public final String username; public final String password; public final AuthType authType; + public final boolean insecure; public ConnectionDetails(String url, String username, String password) { + this(url, username, password, false); + } + + public ConnectionDetails(String url, String username, String password, boolean insecure) { this.url = url; // http://localhost:9200 + this.insecure = insecure; // If the username is provided, the password must be as well, and vice versa if ((username == null && password != null) || (username != null && password == null)) { diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index f25e666d38..a299c9ea83 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -2,9 +2,16 @@ import java.util.Base64; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import lombok.SneakyThrows; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.SslProvider; import reactor.netty.ByteBufMono; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; public class RestClient { public static class Response { @@ -22,20 +29,41 @@ public Response(int responseCode, String responseBody, String responseMessage) { public final ConnectionDetails connectionDetails; private final HttpClient client; + @SneakyThrows public RestClient(ConnectionDetails connectionDetails) { this.connectionDetails = connectionDetails; + SslProvider sslProvider; + if (connectionDetails.insecure) { + SslContext sslContext = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + + sslProvider = SslProvider.builder() + .sslContext(sslContext) + .handlerConfigurator(sslHandler -> { + SSLEngine engine = sslHandler.engine(); + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm(null); + engine.setSSLParameters(sslParameters); + }) + .build(); + } else { + sslProvider = SslProvider.defaultClientProvider(); + } + this.client = HttpClient.create() - .baseUrl(connectionDetails.url) - .headers(h -> { - h.add("Content-Type", "application/json"); - h.add("User-Agent", "RfsWorker-1.0"); - if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { - String credentials = connectionDetails.username + ":" + connectionDetails.password; - String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); - h.add("Authorization", "Basic " + encodedCredentials); - } - }); + .secure(sslProvider) + .baseUrl(connectionDetails.url) + .headers(h -> { + h.add("Content-Type", "application/json"); + h.add("User-Agent", "RfsWorker-1.0"); + if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = connectionDetails.username + ":" + connectionDetails.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.add("Authorization", "Basic " + encodedCredentials); + } + }); } public Mono getAsync(String path) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 2819704899..b0826a10ae 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -4,9 +4,13 @@ import console_link.logic.clusters as logic_clusters import console_link.logic.metrics as logic_metrics import console_link.logic.backfill as logic_backfill -from console_link.logic.backfill import ExitCode +import console_link.logic.snapshot as logic_snapshot + +from console_link.models.utils import ExitCode from console_link.environment import Environment from console_link.models.metrics_source import Component, MetricStatistic +from console_link.models.snapshot import SnapshotStatus + import logging logger = logging.getLogger(__name__) @@ -88,6 +92,34 @@ def replayer_group(ctx): def start_replayer_cmd(ctx): ctx.env.replayer.start() +# ##################### SNAPSHOT ################### + +@cli.group(name="snapshot") +@click.pass_obj +def snapshot_group(ctx): + """All actions related to snapshot creation""" + if ctx.env.snapshot is None: + raise click.UsageError("Snapshot is not set") + + +@snapshot_group.command(name="create") +@click.pass_obj +def create_snapshot_cmd(ctx): + """Create a snapshot of the source cluster""" + snapshot = ctx.env.snapshot + status, message = logic_snapshot.create(snapshot) + if status != SnapshotStatus.COMPLETED: + raise click.ClickException(message) + click.echo(message) + + +@snapshot_group.command(name="status") +@click.pass_obj +def status_snapshot_cmd(ctx): + """Check the status of the snapshot""" + snapshot = ctx.env.snapshot + _, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster, target_cluster=ctx.env.target_cluster) + click.echo(message) # ##################### BACKFILL ################### diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index cf3b854773..8caf9f2285 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -5,6 +5,7 @@ from console_link.logic.metrics import get_metrics_source from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill +from console_link.models.snapshot import Snapshot, S3Snapshot import yaml from cerberus import Validator @@ -17,6 +18,7 @@ "replayer": {"type": "dict", "required": False}, "backfill": {"type": "dict", "required": False}, "metrics_source": {"type": "dict", "required": False}, + "snapshot": {"type": "dict", "required": False}, } @@ -25,6 +27,7 @@ class Environment: target_cluster: Optional[Cluster] = None backfill: Optional[Backfill] = None metrics_source: Optional[MetricsSource] = None + snapshot: Optional[Snapshot] = None def __init__(self, config_file: str): self.config_file = config_file @@ -61,3 +64,11 @@ def __init__(self, config_file: str): logger.info(f"Backfill migration initialized: {self.backfill}") else: logger.info("No backfill provided") + + if 'snapshot' in self.config: + self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) + logger.info(f"Snapshot initialized: {self.snapshot}") + else: + logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py index b7e34e0121..e2fd9c66c8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py @@ -2,7 +2,7 @@ import json import logging from typing import Dict, Optional, Tuple - +from console_link.models.utils import ExitCode from console_link.models.backfill_osi import OpenSearchIngestionBackfill from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill from console_link.models.cluster import Cluster @@ -13,11 +13,6 @@ logger = logging.getLogger(__name__) -class ExitCode(Enum): - SUCCESS = 0 - FAILURE = 1 - - BackfillType = Enum("BackfillType", ["opensearch_ingestion", "reindex_from_snapshot"]) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py new file mode 100644 index 0000000000..37ab68b26c --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py @@ -0,0 +1,28 @@ +import logging +from typing import Tuple +from console_link.models.snapshot import Snapshot, SnapshotStatus + +logger = logging.getLogger(__name__) + +def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: + logger.info(f"Creating snapshot with {args=} and {kwargs=}") + try: + result = snapshot.create(*args, **kwargs) + except Exception as e: + logger.error(f"Failed to create snapshot: {e}") + return SnapshotStatus.FAILED, f"Failure when creating snapshot: {type(e).__name__} {e}" + + if result.success: + return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value + return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value + +def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]: + logger.info("Getting snapshot status") + try: + result = snapshot.status(*args, **kwargs) + except Exception as e: + logger.error(f"Failed to get status of snapshot: {e}") + return SnapshotStatus.FAILED, f"Failure when getting status of snapshot: {type(e).__name__} {e}" + if result.success: + return SnapshotStatus.COMPLETED, result.value + return SnapshotStatus.FAILED, "Snapshot status retrieval failed." + "\n" + result.value diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py new file mode 100644 index 0000000000..72ca5de6ca --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -0,0 +1,85 @@ +from abc import ABC, abstractmethod +from enum import Enum +import logging +import subprocess +from typing import Dict, Optional, Tuple +from console_link.models.cluster import Cluster +from console_link.models.command_result import CommandResult +from cerberus import Validator + +logger = logging.getLogger(__name__) + +SnapshotStatus = Enum( + "SnapshotStatus", [ + "NOT_STARTED", + "RUNNING", + "COMPLETED", + "FAILED" + ]) + +class Snapshot(ABC): + """ + Interface for creating and managing snapshots. + """ + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + self.config = config + self.source_cluster = source_cluster + self.target_cluster = target_cluster + + @abstractmethod + def create(self, *args, **kwargs) -> CommandResult: + """Create a snapshot.""" + pass + + @abstractmethod + def status(self, *args, **kwargs) -> CommandResult: + """Get the status of the snapshot.""" + pass + + +S3_SNAPSHOT_SCHEMA = { + 'snapshot_name': { + 'type': 'string', + 'required': True + }, + 's3_repo_uri': { + 'type': 'string', + 'required': True + }, + 's3_region': { + 'type': 'string', + 'required': True + } +} + +class S3Snapshot(Snapshot): + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + super().__init__(config, source_cluster, target_cluster) + v = Validator(S3_SNAPSHOT_SCHEMA) + if not v.validate(config): + raise ValueError("Invalid config file for snapshot", v.errors) + self.snapshot_name = config['snapshot_name'] + self.s3_repo_uri = config['s3_repo_uri'] + self.s3_region = config['s3_region'] + + def create(self, *args, **kwargs) -> CommandResult: + command = [ + "/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", self.snapshot_name, + "--s3-repo-uri", self.s3_repo_uri, + "--s3-region", self.s3_region, + "--source-host", self.source_cluster.endpoint, + "--target-host", self.target_cluster.endpoint, + ] + if self.source_cluster.allow_insecure or self.target_cluster.allow_insecure: + command.append("--insecure") + logger.info(f"Creating snapshot with command: {' '.join(command)}") + try: + subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) + logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") + return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)} {e.output} {e.stderr}") + def status(self, *args, **kwargs) -> CommandResult: + return CommandResult(success=False, value=f"Command not implemented") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py index 7af59c9253..873bd5d9ba 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py @@ -1,4 +1,5 @@ # define a custom exception for aws api errors +from enum import Enum from typing import Dict @@ -20,3 +21,7 @@ def raise_for_aws_api_error(response: Dict) -> None: "Non-2XX status code received", status_code=status_code ) + +class ExitCode(Enum): + SUCCESS = 0 + FAILURE = 1 \ No newline at end of file diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 85ed9d80af..5219923796 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -1,5 +1,5 @@ source_cluster: - endpoint: "https://capture-proxy-es:9200" + endpoint: "https://capture-proxy:9200" allow_insecure: true basic_auth: username: "admin" @@ -16,3 +16,7 @@ metrics_source: backfill: reindex_from_snapshot: docker: +snapshot: + snapshot_name: "snapshot_2023_01_01" + s3_repo_uri: "s3://my-snapshot-bucket" + s3_region: "us-east-2" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index bb85e1a3f1..3158751990 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -1,4 +1,5 @@ import pathlib +from console_link.models.command_result import CommandResult import requests_mock @@ -66,6 +67,35 @@ def test_cli_with_backfill_describe(runner, env, mocker): assert result.exit_code == 0 +def test_cli_snapshot_create(runner, env, mocker): + mock_create = mocker.patch('console_link.models.snapshot.S3Snapshot.create') + + # Set the mock return values + mock_create.return_value = CommandResult(success=True, value="Snapshot created successfully") + + # Test snapshot creation + result_create = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'create'], catch_exceptions=True) + assert result_create.exit_code == 0 + assert "Snapshot created successfully" in result_create.output + + # Ensure the mocks were called + mock_create.assert_called_once() + +@pytest.mark.skip(reason="Not implemented yet") +def test_cli_snapshot_status(runner, env, mocker): + mock_status = mocker.patch('console_link.models.snapshot.S3Snapshot.status') + + mock_status.return_value = CommandResult(success=True, value="Snapshot status: COMPLETED") + + # Test snapshot status + result_status = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'status'], catch_exceptions=True) + assert result_status.exit_code == 0 + assert "Snapshot status: COMPLETED" in result_status.output + + # Ensure the mocks were called + mock_status.assert_called_once() + + source_cat_indices = """ green open logs-221998 pKNVNlhcRuuUXlwPJag9Kg 5 0 1000 0 167.4kb 167.4kb green open geonames DlT1Qp-7SqaARuECxTaYvw 5 0 1000 0 343.2kb 343.2kb @@ -95,3 +125,4 @@ def test_cli_cat_indices_e2e(runner, env): assert 'TARGET CLUSTER' in result.output assert source_cat_indices in result.output assert target_cat_indices in result.output +