Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial migration console snapshot support #751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public static class Args {
@Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false)
public String sourcePass = null;

@Parameter(names = {"--source-insecure"}, description = "Allow untrusted SSL certificates for source", required = false)
public boolean sourceInsecure = false;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

Expand All @@ -48,6 +51,9 @@ public static class Args {

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--target-insecure"}, description = "Allow untrusted SSL certificates for target", required = false)
public boolean targetInsecure = false;
}

public static void main(String[] args) throws Exception {
Expand All @@ -67,9 +73,11 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final boolean sourceInsecure = arguments.sourceInsecure;
final boolean targetInsecure = arguments.targetInsecure;

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, sourceInsecure);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public static enum Protocol {
public final String username;
public final String password;
public final AuthType authType;
public final boolean insecure;

public ConnectionDetails(String url, String username, String password) {
this(url, username, password, false);
}

public ConnectionDetails(String url, String username, String password, boolean insecure) {
this.url = url; // http://localhost:9200
this.insecure = insecure;

// If the username is provided, the password must be as well, and vice versa
if ((username == null && password != null) || (username != null && password == null)) {
Expand Down
48 changes: 38 additions & 10 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

import java.util.Base64;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.SneakyThrows;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.SslProvider;
import reactor.netty.ByteBufMono;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

public class RestClient {
public static class Response {
Expand All @@ -22,20 +29,41 @@
public final ConnectionDetails connectionDetails;
private final HttpClient client;

@SneakyThrows

Check warning on line 32 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L32

Added line #L32 was not covered by tests
public RestClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;

SslProvider sslProvider;
if (connectionDetails.insecure) {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

Check warning on line 40 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L38-L40

Added lines #L38 - L40 were not covered by tests

sslProvider = SslProvider.builder()
.sslContext(sslContext)
.handlerConfigurator(sslHandler -> {
SSLEngine engine = sslHandler.engine();
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm(null);
engine.setSSLParameters(sslParameters);
})
.build();
} else {
sslProvider = SslProvider.defaultClientProvider();

Check warning on line 52 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L42-L52

Added lines #L42 - L52 were not covered by tests
}

this.client = HttpClient.create()
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);
}
});
.secure(sslProvider)
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");

Check warning on line 60 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L56-L60

Added lines #L56 - L60 were not covered by tests
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);

Check warning on line 64 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L62-L64

Added lines #L62 - L64 were not covered by tests
}
});

Check warning on line 66 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L66

Added line #L66 was not covered by tests
}

public Mono<Response> getAsync(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import console_link.logic.clusters as logic_clusters
import console_link.logic.metrics as logic_metrics
import console_link.logic.backfill as logic_backfill
from console_link.logic.backfill import ExitCode
import console_link.logic.snapshot as logic_snapshot

from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
from console_link.models.snapshot import SnapshotStatus

import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,12 +92,43 @@
def start_replayer_cmd(ctx):
ctx.env.replayer.start()

# ##################### SNAPSHOT ###################


@cli.group(name="snapshot")
@click.pass_obj
def snapshot_group(ctx):
"""All actions related to snapshot creation"""
if ctx.env.snapshot is None:
raise click.UsageError("Snapshot is not set")

Check warning on line 103 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L103

Added line #L103 was not covered by tests


@snapshot_group.command(name="create")
@click.pass_obj
def create_snapshot_cmd(ctx):
"""Create a snapshot of the source cluster"""
snapshot = ctx.env.snapshot
status, message = logic_snapshot.create(snapshot)
if status != SnapshotStatus.COMPLETED:
raise click.ClickException(message)

Check warning on line 113 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L113

Added line #L113 was not covered by tests
click.echo(message)


@snapshot_group.command(name="status")
@click.pass_obj
def status_snapshot_cmd(ctx):
"""Check the status of the snapshot"""
snapshot = ctx.env.snapshot
_, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster,

Check warning on line 122 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L121-L122

Added lines #L121 - L122 were not covered by tests
target_cluster=ctx.env.target_cluster)
click.echo(message)

Check warning on line 124 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L124

Added line #L124 was not covered by tests

# ##################### BACKFILL ###################

# As we add other forms of backfill migrations, we should incorporate a way to dynamically allow different sets of
# arguments depending on the type of backfill migration


@cli.group(name="backfill")
@click.pass_obj
def backfill_group(ctx):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from console_link.logic.metrics import get_metrics_source
from console_link.logic.backfill import get_backfill
from console_link.models.backfill_base import Backfill
from console_link.models.snapshot import Snapshot, S3Snapshot
import yaml
from cerberus import Validator

Expand All @@ -17,6 +18,7 @@
"replayer": {"type": "dict", "required": False},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
}


Expand All @@ -25,6 +27,7 @@ class Environment:
target_cluster: Optional[Cluster] = None
backfill: Optional[Backfill] = None
metrics_source: Optional[MetricsSource] = None
snapshot: Optional[Snapshot] = None

def __init__(self, config_file: str):
self.config_file = config_file
Expand Down Expand Up @@ -61,3 +64,11 @@ def __init__(self, config_file: str):
logger.info(f"Backfill migration initialized: {self.backfill}")
else:
logger.info("No backfill provided")

if 'snapshot' in self.config:
self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Down the line, we likely want to clean this up and use a factory to give us a s3 snapshot vs local vs whatever, but I think it's good for now.

source_cluster=self.source_cluster,
target_cluster=self.target_cluster)
logger.info(f"Snapshot initialized: {self.snapshot}")
else:
logger.info("No snapshot provided")
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from typing import Dict, Optional, Tuple

from console_link.models.utils import ExitCode
from console_link.models.backfill_osi import OpenSearchIngestionBackfill
from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill
from console_link.models.cluster import Cluster
Expand All @@ -13,11 +13,6 @@
logger = logging.getLogger(__name__)


class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1


BackfillType = Enum("BackfillType",
["opensearch_ingestion", "reindex_from_snapshot"])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
from typing import Tuple
from console_link.models.snapshot import Snapshot, SnapshotStatus

logger = logging.getLogger(__name__)


def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info(f"Creating snapshot with {args=} and {kwargs=}")
try:
result = snapshot.create(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to create snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when creating snapshot: {type(e).__name__} {e}"

Check warning on line 14 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py#L9-L14

Added lines #L9 - L14 were not covered by tests

if result.success:
return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value
return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value

Check warning on line 18 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py#L16-L18

Added lines #L16 - L18 were not covered by tests


def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info("Getting snapshot status")
try:
result = snapshot.status(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to get status of snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when getting status of snapshot: {type(e).__name__} {e}"
if result.success:
return SnapshotStatus.COMPLETED, result.value
return SnapshotStatus.FAILED, "Snapshot status retrieval failed." + "\n" + result.value

Check warning on line 30 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/snapshot.py#L22-L30

Added lines #L22 - L30 were not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Cluster:
aws_secret_arn: Optional[str] = None
auth_type: Optional[AuthMethod] = None
auth_details: Optional[Dict[str, Any]] = None
allow_insecure: bool = None

def __init__(self, config: Dict) -> None:
logger.info(f"Initializing cluster with config: {config}")
Expand All @@ -74,8 +75,7 @@ def __init__(self, config: Dict) -> None:
raise ValueError("Invalid config file for cluster", v.errors)

self.endpoint = config["endpoint"]
if self.endpoint.startswith("https"):
self.allow_insecure = config.get("allow_insecure", False)
self.allow_insecure = config.get("allow_insecure", False)
if 'no_auth' in config:
self.auth_type = AuthMethod.NO_AUTH
elif 'basic_auth' in config:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from abc import ABC, abstractmethod
from enum import Enum
import logging
import subprocess
from typing import Dict, Optional
from console_link.models.cluster import Cluster
from console_link.models.command_result import CommandResult
from cerberus import Validator

logger = logging.getLogger(__name__)

SnapshotStatus = Enum(
"SnapshotStatus", [
"NOT_STARTED",
"RUNNING",
"COMPLETED",
"FAILED"])


class Snapshot(ABC):
"""
Interface for creating and managing snapshots.
"""
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
self.config = config
self.source_cluster = source_cluster
self.target_cluster = target_cluster

@abstractmethod
def create(self, *args, **kwargs) -> CommandResult:
"""Create a snapshot."""
pass

Check warning on line 32 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L32

Added line #L32 was not covered by tests

@abstractmethod
def status(self, *args, **kwargs) -> CommandResult:
"""Get the status of the snapshot."""
pass

Check warning on line 37 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L37

Added line #L37 was not covered by tests


S3_SNAPSHOT_SCHEMA = {
'snapshot_name': {
'type': 'string',
'required': True
},
's3_repo_uri': {
'type': 'string',
'required': True
},
's3_region': {
'type': 'string',
'required': True
}
}


class S3Snapshot(Snapshot):
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
super().__init__(config, source_cluster, target_cluster)
v = Validator(S3_SNAPSHOT_SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for snapshot", v.errors)

Check warning on line 61 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L61

Added line #L61 was not covered by tests
self.snapshot_name = config['snapshot_name']
self.s3_repo_uri = config['s3_repo_uri']
self.s3_region = config['s3_region']

def create(self, *args, **kwargs) -> CommandResult:
command = [

Check warning on line 67 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L67

Added line #L67 was not covered by tests
"/root/createSnapshot/bin/CreateSnapshot",
"--snapshot-name", self.snapshot_name,
"--s3-repo-uri", self.s3_repo_uri,
"--s3-region", self.s3_region,
"--source-host", self.source_cluster.endpoint,
"--target-host", self.target_cluster.endpoint,
]

if self.source_cluster.allow_insecure:
command.append("--source-insecure")
if self.target_cluster.allow_insecure:
command.append("--target-insecure")

Check warning on line 79 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L76-L79

Added lines #L76 - L79 were not covered by tests

logger.info(f"Creating snapshot with command: {' '.join(command)}")
try:

Check warning on line 82 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L81-L82

Added lines #L81 - L82 were not covered by tests
# Pass None to stdout and stderr to not capture output and show in terminal
subprocess.run(command, stdout=None, stderr=None, text=True, check=True)
logger.info(f"Snapshot {self.config['snapshot_name']} created successfully")
return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I kind of forgot that we don't have to do any of the s3 stuff ourselves! 👍

except subprocess.CalledProcessError as e:
logger.error(f"Failed to create snapshot: {str(e)}")
return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}")

Check warning on line 89 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L84-L89

Added lines #L84 - L89 were not covered by tests

def status(self, *args, **kwargs) -> CommandResult:
return CommandResult(success=False, value="Command not implemented")

Check warning on line 92 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py#L92

Added line #L92 was not covered by tests
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# define a custom exception for aws api errors
from enum import Enum
from typing import Dict


Expand All @@ -20,3 +21,8 @@ def raise_for_aws_api_error(response: Dict) -> None:
"Non-2XX status code received",
status_code=status_code
)


class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1
Loading
Loading