Skip to content

Commit

Permalink
Merge pull request #751 from AndreKurait/console-lib-snapshot-create
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreKurait authored Jun 20, 2024
2 parents 89b7dac + 11dc524 commit 5413b91
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 28 deletions.
12 changes: 10 additions & 2 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public static class Args {
@Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false)
public String sourcePass = null;

@Parameter(names = {"--source-insecure"}, description = "Allow untrusted SSL certificates for source", required = false)
public boolean sourceInsecure = false;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

Expand All @@ -48,6 +51,9 @@ public static class Args {

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--target-insecure"}, description = "Allow untrusted SSL certificates for target", required = false)
public boolean targetInsecure = false;
}

public static void main(String[] args) throws Exception {
Expand All @@ -67,9 +73,11 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final boolean sourceInsecure = arguments.sourceInsecure;
final boolean targetInsecure = arguments.targetInsecure;

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, sourceInsecure);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public static enum Protocol {
public final String username;
public final String password;
public final AuthType authType;
public final boolean insecure;

public ConnectionDetails(String url, String username, String password) {
this(url, username, password, false);
}

public ConnectionDetails(String url, String username, String password, boolean insecure) {
this.url = url; // http://localhost:9200
this.insecure = insecure;

// If the username is provided, the password must be as well, and vice versa
if ((username == null && password != null) || (username != null && password == null)) {
Expand Down
48 changes: 38 additions & 10 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

import java.util.Base64;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.SneakyThrows;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.SslProvider;
import reactor.netty.ByteBufMono;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

public class RestClient {
public static class Response {
Expand All @@ -22,20 +29,41 @@ public Response(int responseCode, String responseBody, String responseMessage) {
public final ConnectionDetails connectionDetails;
private final HttpClient client;

@SneakyThrows
public RestClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;

SslProvider sslProvider;
if (connectionDetails.insecure) {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

sslProvider = SslProvider.builder()
.sslContext(sslContext)
.handlerConfigurator(sslHandler -> {
SSLEngine engine = sslHandler.engine();
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm(null);
engine.setSSLParameters(sslParameters);
})
.build();
} else {
sslProvider = SslProvider.defaultClientProvider();
}

this.client = HttpClient.create()
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);
}
});
.secure(sslProvider)
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);
}
});
}

public Mono<Response> getAsync(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import console_link.logic.clusters as logic_clusters
import console_link.logic.metrics as logic_metrics
import console_link.logic.backfill as logic_backfill
from console_link.logic.backfill import ExitCode
import console_link.logic.snapshot as logic_snapshot

from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
from console_link.models.snapshot import SnapshotStatus

import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,12 +92,43 @@ def replayer_group(ctx):
def start_replayer_cmd(ctx):
ctx.env.replayer.start()

# ##################### SNAPSHOT ###################


@cli.group(name="snapshot")
@click.pass_obj
def snapshot_group(ctx):
"""All actions related to snapshot creation"""
if ctx.env.snapshot is None:
raise click.UsageError("Snapshot is not set")


@snapshot_group.command(name="create")
@click.pass_obj
def create_snapshot_cmd(ctx):
"""Create a snapshot of the source cluster"""
snapshot = ctx.env.snapshot
status, message = logic_snapshot.create(snapshot)
if status != SnapshotStatus.COMPLETED:
raise click.ClickException(message)
click.echo(message)


@snapshot_group.command(name="status")
@click.pass_obj
def status_snapshot_cmd(ctx):
"""Check the status of the snapshot"""
snapshot = ctx.env.snapshot
_, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster,
target_cluster=ctx.env.target_cluster)
click.echo(message)

# ##################### BACKFILL ###################

# As we add other forms of backfill migrations, we should incorporate a way to dynamically allow different sets of
# arguments depending on the type of backfill migration


@cli.group(name="backfill")
@click.pass_obj
def backfill_group(ctx):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from console_link.logic.metrics import get_metrics_source
from console_link.logic.backfill import get_backfill
from console_link.models.backfill_base import Backfill
from console_link.models.snapshot import Snapshot, S3Snapshot
import yaml
from cerberus import Validator

Expand All @@ -17,6 +18,7 @@
"replayer": {"type": "dict", "required": False},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
}


Expand All @@ -25,6 +27,7 @@ class Environment:
target_cluster: Optional[Cluster] = None
backfill: Optional[Backfill] = None
metrics_source: Optional[MetricsSource] = None
snapshot: Optional[Snapshot] = None

def __init__(self, config_file: str):
self.config_file = config_file
Expand Down Expand Up @@ -61,3 +64,11 @@ def __init__(self, config_file: str):
logger.info(f"Backfill migration initialized: {self.backfill}")
else:
logger.info("No backfill provided")

if 'snapshot' in self.config:
self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"],
source_cluster=self.source_cluster,
target_cluster=self.target_cluster)
logger.info(f"Snapshot initialized: {self.snapshot}")
else:
logger.info("No snapshot provided")
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from typing import Dict, Optional, Tuple

from console_link.models.utils import ExitCode
from console_link.models.backfill_osi import OpenSearchIngestionBackfill
from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill
from console_link.models.cluster import Cluster
Expand All @@ -13,11 +13,6 @@
logger = logging.getLogger(__name__)


class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1


BackfillType = Enum("BackfillType",
["opensearch_ingestion", "reindex_from_snapshot"])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import logging
from typing import Tuple
from console_link.models.snapshot import Snapshot, SnapshotStatus

logger = logging.getLogger(__name__)


def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info(f"Creating snapshot with {args=} and {kwargs=}")
try:
result = snapshot.create(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to create snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when creating snapshot: {type(e).__name__} {e}"

if result.success:
return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value
return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value


def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info("Getting snapshot status")
try:
result = snapshot.status(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to get status of snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when getting status of snapshot: {type(e).__name__} {e}"
if result.success:
return SnapshotStatus.COMPLETED, result.value
return SnapshotStatus.FAILED, "Snapshot status retrieval failed." + "\n" + result.value
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Cluster:
aws_secret_arn: Optional[str] = None
auth_type: Optional[AuthMethod] = None
auth_details: Optional[Dict[str, Any]] = None
allow_insecure: bool = None

def __init__(self, config: Dict) -> None:
logger.info(f"Initializing cluster with config: {config}")
Expand All @@ -74,8 +75,7 @@ def __init__(self, config: Dict) -> None:
raise ValueError("Invalid config file for cluster", v.errors)

self.endpoint = config["endpoint"]
if self.endpoint.startswith("https"):
self.allow_insecure = config.get("allow_insecure", False)
self.allow_insecure = config.get("allow_insecure", False)
if 'no_auth' in config:
self.auth_type = AuthMethod.NO_AUTH
elif 'basic_auth' in config:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from abc import ABC, abstractmethod
from enum import Enum
import logging
import subprocess
from typing import Dict, Optional
from console_link.models.cluster import Cluster
from console_link.models.command_result import CommandResult
from cerberus import Validator

logger = logging.getLogger(__name__)

SnapshotStatus = Enum(
"SnapshotStatus", [
"NOT_STARTED",
"RUNNING",
"COMPLETED",
"FAILED"])


class Snapshot(ABC):
"""
Interface for creating and managing snapshots.
"""
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
self.config = config
self.source_cluster = source_cluster
self.target_cluster = target_cluster

@abstractmethod
def create(self, *args, **kwargs) -> CommandResult:
"""Create a snapshot."""
pass

@abstractmethod
def status(self, *args, **kwargs) -> CommandResult:
"""Get the status of the snapshot."""
pass


S3_SNAPSHOT_SCHEMA = {
'snapshot_name': {
'type': 'string',
'required': True
},
's3_repo_uri': {
'type': 'string',
'required': True
},
's3_region': {
'type': 'string',
'required': True
}
}


class S3Snapshot(Snapshot):
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
super().__init__(config, source_cluster, target_cluster)
v = Validator(S3_SNAPSHOT_SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for snapshot", v.errors)
self.snapshot_name = config['snapshot_name']
self.s3_repo_uri = config['s3_repo_uri']
self.s3_region = config['s3_region']

def create(self, *args, **kwargs) -> CommandResult:
command = [
"/root/createSnapshot/bin/CreateSnapshot",
"--snapshot-name", self.snapshot_name,
"--s3-repo-uri", self.s3_repo_uri,
"--s3-region", self.s3_region,
"--source-host", self.source_cluster.endpoint,
"--target-host", self.target_cluster.endpoint,
]

if self.source_cluster.allow_insecure:
command.append("--source-insecure")
if self.target_cluster.allow_insecure:
command.append("--target-insecure")

logger.info(f"Creating snapshot with command: {' '.join(command)}")
try:
# Pass None to stdout and stderr to not capture output and show in terminal
subprocess.run(command, stdout=None, stderr=None, text=True, check=True)
logger.info(f"Snapshot {self.config['snapshot_name']} created successfully")
return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to create snapshot: {str(e)}")
return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}")

def status(self, *args, **kwargs) -> CommandResult:
return CommandResult(success=False, value="Command not implemented")
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# define a custom exception for aws api errors
from enum import Enum
from typing import Dict


Expand All @@ -20,3 +21,8 @@ def raise_for_aws_api_error(response: Dict) -> None:
"Non-2XX status code received",
status_code=status_code
)


class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1
Loading

0 comments on commit 5413b91

Please sign in to comment.