From b5559fefc72df7923c03390dd0df20e28676b2e0 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 20 Jun 2024 12:53:59 -0600 Subject: [PATCH 01/20] Add outline of metadata implementation Signed-off-by: Mikayla Thompson --- .../console_link/console_link/environment.py | 8 ++ .../console_link/logic/metadata.py | 8 ++ .../console_link/models/metadata.py | 56 +++++++++++ .../console_link/models/schema_tools.py | 10 ++ .../lib/console_link/services.yaml | 30 ++++-- .../lib/console_link/tests/data/services.yaml | 21 +++-- .../lib/console_link/tests/test_metadata.py | 92 +++++++++++++++++++ 7 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index d3b83b470..ad7cfc4a6 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -9,6 +9,8 @@ import yaml from cerberus import Validator +from console_link.models.metadata import Metadata + logger = logging.getLogger(__name__) @@ -19,6 +21,7 @@ "backfill": {"type": "dict", "required": False}, "metrics_source": {"type": "dict", "required": False}, "snapshot": {"type": "dict", "required": False}, + "metadata_migration": {"type": "dict", "required": False} } @@ -28,6 +31,7 @@ class Environment: backfill: Optional[Backfill] = None metrics_source: Optional[MetricsSource] = None snapshot: Optional[Snapshot] = None + metadata: Optional[Metadata] = None def __init__(self, config_file: str): self.config_file = config_file @@ -72,3 +76,7 @@ def __init__(self, config_file: str): logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") + if 'metadata_migration' in self.config: + self.metadata: Metadata = Metadata(self.config["metadata_migration"], + target_cluster=self.target_cluster, + snapshot=self.snapshot) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py new file mode 100644 index 000000000..a72dd25ac --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py @@ -0,0 +1,8 @@ +from typing import Tuple + +from console_link.models.metadata import Metadata +from console_link.models.utils import ExitCode + + +def migrate(metadata: Metadata) -> Tuple[ExitCode, str]: + return ExitCode.SUCCESS, metadata.migrate() diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py new file mode 100644 index 000000000..67a70ed8f --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -0,0 +1,56 @@ +from typing import Optional +from cerberus import Validator + +from console_link.models.command_result import CommandResult +from console_link.models.schema_tools import list_schema +from console_link.models.cluster import Cluster +from console_link.models.snapshot import Snapshot + + +FROM_SNAPSHOT_SCHEMA = { + "type": "dict", + # In the future, there should be a "from_snapshot" and a "from_live_cluster" option, but for now only snapshot is + # supported, so this is required. It _can_ be null, but that requires a snapshot to defined on its own in + # the services.yaml file. + "required": True, + "nullable": True, + "schema": { + "snapshot_name": {"type": "string", "required": True}, + "local_dir": {"type": "string", "required": False}, + "s3": { + "type": "dict", + # Again, in the future there should be a 'local' option, but for now this block is required if the + # snapshot details are being specified. + "required": True, + "schema": { + "repo_uri": {"type": "string", "required": True}, + "aws_region": {"type": "string", "required": True} + } + }, + } +} + +SCHEMA = { + "from_snapshot": FROM_SNAPSHOT_SCHEMA, + "min_replicas": {"type": "integer", "min": 0, "required": False}, + "index_allowlist": list_schema(required=False), + "index_template_allowlist": list_schema(required=False), + "component-template-allowlist": list_schema(required=False) +} + + +class Metadata: + def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] = None): + v = Validator(SCHEMA) + if not v.validate(config): + raise ValueError(v.errors) + self._config = config + self._target_cluster = target_cluster + self._snapshot = snapshot + + if (not self._snapshot) and (config["from_snapshot"] is None): + raise ValueError("No snapshot is specified or can be assumed " + "for the metadata migration to use.") + + def migrate(self) -> CommandResult: + pass diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/schema_tools.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/schema_tools.py index 8ffc02288..432c66491 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/schema_tools.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/schema_tools.py @@ -12,3 +12,13 @@ def one_of(field, value, error): elif len(found_objects) < 1: error(field, f"No values are present from set: {sorted(values_to_restrict)}") return one_of + + +def list_schema(required=False, list_member_type="string") -> dict: + return { + 'type': 'list', + 'required': required, + 'schema': { + 'type': list_member_type, + } + } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 521992379..9a272dd14 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -2,21 +2,39 @@ source_cluster: endpoint: "https://capture-proxy:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: snapshot_name: "snapshot_2023_01_01" s3_repo_uri: "s3://my-snapshot-bucket" s3_region: "us-east-2" + reindex_from_snapshot: + docker: +metadata_migration: + from_snapshot: # If not provided, these are assumed from the snapshot object + local_dir: "/tmp/s3" + # snapshot_name: "reindex_from_snapshot" + # s3: + # repo_uri: "s3://my-bucket" + # local_dir: "/tmp/s3" + # aws_region: "us-east-1" + # lucene_dir: "/tmp/lucene" + min_replicas: 0 + index_allowlist: + - "my_index" + index_template_allowlist: + - "my_index_template" + component-template-allowlist: + - "my_component_template" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml index abca9d89a..c7403d785 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml @@ -2,21 +2,30 @@ source_cluster: endpoint: "https://capture-proxy-es:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: snapshot_name: "test_snapshot" s3_repo_uri: "s3://test-bucket" s3_region: "us-east-2" +metadata_migration: + from_snapshot: # If not provided, these are assumed from the snapshot object + min_replicas: 0 + index_allowlist: + - "my_index" + index_template_allowlist: + - "my_index_template" + component-template-allowlist: + - "my_component_template" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py new file mode 100644 index 000000000..aeb2ee631 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py @@ -0,0 +1,92 @@ + + +import pytest +from console_link.models.metadata import Metadata +from console_link.models.snapshot import S3Snapshot +from tests.utils import create_valid_cluster + + +def test_metadata_init_with_fully_specified_config_succeeds(): + config = { + "from_snapshot": { + "local_dir": "/tmp/s3", + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + }, + "min_replicas": 1, + "index_allowlist": [ + "my_index", "my_second_index" + ], + "index_template_allowlist": [ + "my_index_template", "my_second_index_template" + ], + "component-template-allowlist": [ + "my_component_template", "my_second_component_template" + ] + } + metadata = Metadata(config, create_valid_cluster(), None) + assert metadata._config == config + assert metadata._snapshot is None + assert isinstance(metadata, Metadata) + + +def test_metadata_init_with_minimal_config_no_external_snapshot_succeeds(): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + metadata = Metadata(config, create_valid_cluster(), None) + assert metadata._config == config + assert metadata._snapshot is None + assert isinstance(metadata, Metadata) + + +def test_metadata_init_with_missing_snapshot_config_no_external_snapshot_fails(): + config = { + "from_snapshot": None + } + with pytest.raises(ValueError) as excinfo: + Metadata(config, create_valid_cluster(), None) + assert "No snapshot is specified" in str(excinfo.value) + + +def test_metadata_init_with_partial_snapshot_config_no_external_snapshot_fails(): + config = { + "from_snapshot": { + "s3": { + "aws_region": "us-east-1" + }, + } + } + with pytest.raises(ValueError) as excinfo: + Metadata(config, create_valid_cluster(), None) + print(excinfo) + assert 'snapshot_name' in excinfo.value.args[0]['from_snapshot'][0] + assert 'required field' == excinfo.value.args[0]['from_snapshot'][0]['snapshot_name'][0] + + assert 'repo_uri' in excinfo.value.args[0]['from_snapshot'][0]['s3'][0] + assert 'required field' == excinfo.value.args[0]['from_snapshot'][0]['s3'][0]['repo_uri'][0] + + +def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(): + snapshot_config = { + "snapshot_name": "test_snapshot", + "s3_repo_uri": "s3://test-bucket", + "s3_region": "us-east-2" + } + snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + config = { + "from_snapshot": None, + } + metadata = Metadata(config, create_valid_cluster(), snapshot) + assert metadata._config == config + assert metadata._snapshot == snapshot + assert isinstance(metadata, Metadata) From 5b12d5bc159c1840e3e9a7e3bc50ce2b49ecca06 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Thu, 20 Jun 2024 15:45:01 -0600 Subject: [PATCH 02/20] Add logic, implement migrate Signed-off-by: Mikayla Thompson --- .../console_link/logic/metadata.py | 13 ++- .../console_link/models/metadata.py | 99 ++++++++++++++++++- .../console_link/models/snapshot.py | 1 + .../lib/console_link/services.yaml | 11 +-- .../lib/console_link/tests/test_metadata.py | 49 ++++++++- 5 files changed, 154 insertions(+), 19 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py index a72dd25ac..3637446ae 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py @@ -2,7 +2,18 @@ from console_link.models.metadata import Metadata from console_link.models.utils import ExitCode +import logging + +logger = logging.getLogger(__name__) def migrate(metadata: Metadata) -> Tuple[ExitCode, str]: - return ExitCode.SUCCESS, metadata.migrate() + logger.info("Migrating metadata") + try: + result = metadata.migrate() + except Exception as e: + logger.error(f"Failed to migrate metadata: {e}") + return ExitCode.FAILURE, f"Failure when migrating metadata: {e}" + if result.success: + return ExitCode.SUCCESS, result.value + return ExitCode.FAILURE, result.value diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py index 67a70ed8f..2b24e3f7c 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -1,11 +1,15 @@ +import subprocess from typing import Optional from cerberus import Validator +import tempfile +import logging from console_link.models.command_result import CommandResult from console_link.models.schema_tools import list_schema -from console_link.models.cluster import Cluster -from console_link.models.snapshot import Snapshot +from console_link.models.cluster import AuthMethod, Cluster +from console_link.models.snapshot import S3Snapshot, Snapshot +logger = logging.getLogger(__name__) FROM_SNAPSHOT_SCHEMA = { "type": "dict", @@ -39,6 +43,10 @@ } +def generate_tmp_dir(name: str) -> str: + return tempfile.mkdtemp(prefix=f"migration-{name}-") + + class Metadata: def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] = None): v = Validator(SCHEMA) @@ -46,11 +54,92 @@ def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] raise ValueError(v.errors) self._config = config self._target_cluster = target_cluster - self._snapshot = snapshot - if (not self._snapshot) and (config["from_snapshot"] is None): + if not isinstance(snapshot, S3Snapshot): + # We can't currently use the snapshot info unless it's in S3. We need to ignore it. + logger.info("Ignoring externally specified snapshot info since it's not in S3.") + snapshot = None + + if (not snapshot) and (config["from_snapshot"] is None): raise ValueError("No snapshot is specified or can be assumed " "for the metadata migration to use.") + + self._min_replicas = config.get("min_replicas", 0) + self._index_allowlist = config.get("index_allowlist", None) + self._index_template_allowlist = config.get("index_template_allowlist", None) + self._component_template_allowlist = config.get("component-template-allowlist", None) + + # If `from_snapshot` is fully specified, use those values to define snapshot params + if config["from_snapshot"] is not None: + logger.info("Defining snapshot params for metadata migration from config file") + self._snapshot_name = config["from_snapshot"]["snapshot_name"] + if "local_dir" in config["from_snapshot"]: + self._local_dir = config["from_snapshot"]["local_dir"] + else: + self._local_dir = generate_tmp_dir(self._snapshot_name) + self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] + self._aws_region = config["from_snapshot"]["s3"]["aws_region"] + else: + assert snapshot is not None # This follows from the logic above, but I'm asserting it to make mypy happy + logger.info("Defining snapshot params for metadata migration from Snapshot object") + self._snapshot_name = snapshot.snapshot_name + self._local_dir = generate_tmp_dir(self._snapshot_name) + self._s3_uri = snapshot.s3_repo_uri + self._aws_region = snapshot.s3_region + + logger.debug(f"Snapshot name: {self._snapshot_name}") + logger.debug(f"Local dir: {self._local_dir}") + logger.debug(f"S3 URI: {self._s3_uri}") + logger.debug(f"AWS region: {self._aws_region}") + logger.debug(f"Min replicas: {self._min_replicas}") + logger.debug(f"Index allowlist: {self._index_allowlist}") + logger.debug(f"Index template allowlist: {self._index_template_allowlist}") + logger.debug(f"Component template allowlist: {self._component_template_allowlist}") + logger.info("Metadata migration configuration defined") def migrate(self) -> CommandResult: - pass + password_field_index = None + command = [ + "/root/metadataMigration/bin/MetadataMigration", + # Initially populate only the required params + "--snapshot-name", self._snapshot_name, + "--s3-local-dir", self._local_dir, + "--s3-repo-uri", self._s3_uri, + "--s3-region", self._aws_region, + "--target-host", self._target_cluster.endpoint, + "--min-replicas", str(self._min_replicas) + ] + if self._target_cluster.auth_details == AuthMethod.BASIC_AUTH: + try: + command.extend([ + "--target-username", self._target_cluster.auth_details.get("username"), + "--target-password", self._target_cluster.auth_details.get("password") + ]) + password_field_index = len(command) - 1 + logger.info("Using basic auth for target cluster") + except KeyError as e: + raise ValueError(f"Missing required auth details for target cluster: {e}") + + if self._index_allowlist: + command.extend(["--index-allowlist", ", ".join(self._index_allowlist)]) + + if self._index_template_allowlist: + command.extend(["--index-template-allowlist", ", ".join(self._index_template_allowlist)]) + + if self._component_template_allowlist: + command.extend(["--component-template-allowlist", ", ".join(self._component_template_allowlist)]) + + if password_field_index: + display_command = command[:password_field_index] + ["********"] + command[password_field_index:] + else: + display_command = command + logger.info(f"Creating snapshot with command: {' '.join(display_command)}") + + try: + # Pass None to stdout and stderr to not capture output and show in terminal + subprocess.Popen(command, stdout=None, stderr=None, text=True, start_new_session=True) + logger.info(f"Metadata migration for snapshot {self._snapshot_name} initiated") + return CommandResult(success=True, value="Metadata migration initiated") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to migrate metadata: {str(e)}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 690ad7f7b..3c55d2504 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -64,6 +64,7 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Option self.s3_region = config['s3_region'] def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 9a272dd14..7282f1a8c 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -20,17 +20,8 @@ snapshot: snapshot_name: "snapshot_2023_01_01" s3_repo_uri: "s3://my-snapshot-bucket" s3_region: "us-east-2" - reindex_from_snapshot: - docker: metadata_migration: - from_snapshot: # If not provided, these are assumed from the snapshot object - local_dir: "/tmp/s3" - # snapshot_name: "reindex_from_snapshot" - # s3: - # repo_uri: "s3://my-bucket" - # local_dir: "/tmp/s3" - # aws_region: "us-east-1" - # lucene_dir: "/tmp/lucene" + from_snapshot: min_replicas: 0 index_allowlist: - "my_index" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py index aeb2ee631..8b1db9e6d 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py @@ -29,7 +29,6 @@ def test_metadata_init_with_fully_specified_config_succeeds(): } metadata = Metadata(config, create_valid_cluster(), None) assert metadata._config == config - assert metadata._snapshot is None assert isinstance(metadata, Metadata) @@ -45,7 +44,6 @@ def test_metadata_init_with_minimal_config_no_external_snapshot_succeeds(): } metadata = Metadata(config, create_valid_cluster(), None) assert metadata._config == config - assert metadata._snapshot is None assert isinstance(metadata, Metadata) @@ -88,5 +86,50 @@ def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(): } metadata = Metadata(config, create_valid_cluster(), snapshot) assert metadata._config == config - assert metadata._snapshot == snapshot + assert metadata._snapshot_name == snapshot_config["snapshot_name"] assert isinstance(metadata, Metadata) + + +def test_metadata_init_with_partial_config_and_external_snapshot_fails(): + snapshot_config = { + "snapshot_name": "test_snapshot", + "s3_repo_uri": "s3://test-bucket", + "s3_region": "us-east-2" + } + snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + config = { + "from_snapshot": { + "local_dir": "/tmp/s3", + "snapshot_name": "reindex_from_snapshot", + } + } + with pytest.raises(ValueError) as excinfo: + Metadata(config, create_valid_cluster(), snapshot) + print(excinfo) + assert 's3' in excinfo.value.args[0]['from_snapshot'][0] + assert 'required field' == excinfo.value.args[0]['from_snapshot'][0]['s3'][0] + + +def test_full_config_and_snapshot_gives_priority_to_config(): + snapshot_config = { + "snapshot_name": "test_snapshot", + "s3_repo_uri": "s3://test-bucket", + "s3_region": "us-east-2" + } + snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + config = { + "from_snapshot": { + "local_dir": "/tmp/s3", + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + metadata = Metadata(config, create_valid_cluster(), snapshot) + assert isinstance(metadata, Metadata) + assert metadata._snapshot_name == config["from_snapshot"]["snapshot_name"] + assert metadata._s3_uri == config["from_snapshot"]["s3"]["repo_uri"] + assert metadata._aws_region == config["from_snapshot"]["s3"]["aws_region"] + assert metadata._local_dir == config["from_snapshot"]["local_dir"] From 40bb29154a57ce051253baec507f4ac937c2dd10 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 11:03:48 -0600 Subject: [PATCH 03/20] Add more metadata stuff Signed-off-by: Mikayla Thompson --- .../main/java/com/rfs/MetadataMigration.java | 6 +++- .../src/main/docker/docker-compose.yml | 12 +++++++ .../lib/console_link/console_link/cli.py | 20 ++++++++++++ .../console_link/models/cluster.py | 2 +- .../console_link/models/metadata.py | 3 ++ .../console_link/models/snapshot.py | 13 +++++++- .../lib/console_link/services.yaml | 32 +++++++++---------- 7 files changed, 68 insertions(+), 20 deletions(-) diff --git a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java index 35cc8bbca..b499595e6 100644 --- a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java +++ b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java @@ -56,6 +56,9 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; + @Parameter(names = {"--target-insecure"}, description = "Allow untrusted SSL certificates for target", required = false) + public boolean targetInsecure = false; + @Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate" + " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false) public List indexAllowlist = List.of(); @@ -90,11 +93,12 @@ public static void main(String[] args) throws Exception { final String targetHost = arguments.targetHost; final String targetUser = arguments.targetUser; final String targetPass = arguments.targetPass; + final boolean targetInsecure = arguments.targetInsecure; final List indexTemplateAllowlist = arguments.indexTemplateAllowlist; final List componentTemplateAllowlist = arguments.componentTemplateAllowlist; final int awarenessDimensionality = arguments.minNumberOfReplicas + 1; - final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure); TryHandlePhaseFailure.executeWithTryCatch(() -> { log.info("Running RfsWorker"); diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 081132070..09ae73c12 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -82,12 +82,24 @@ services: image: 'opensearchproject/opensearch:latest' environment: - discovery.type=single-node + - plugins.security.disabled=true - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! networks: - migrations ports: - "29200:9200" + opensearchsource: + image: opensearchproject/opensearch + environment: + - discovery.type=single-node + - plugins.security.disabled=true + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! + - ~/.aws:/root/.aws + networks: + - migrations + ports: + - "39200:9200" migration-console: image: 'migrations/migration_console:latest' diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 5d6952835..2732ec094 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -5,6 +5,7 @@ import console_link.logic.metrics as logic_metrics import console_link.logic.backfill as logic_backfill import console_link.logic.snapshot as logic_snapshot +import console_link.logic.metadata as logic_metadata from console_link.models.utils import ExitCode from console_link.environment import Environment @@ -198,6 +199,25 @@ def status_backfill_cmd(ctx): # ##################### METRICS ################### +@cli.group(name="metadata") +@click.pass_obj +def metadata_group(ctx): + """All actions related to metadata migration""" + if ctx.env.metadata is None: + raise click.UsageError("Metadata is not set") + + +@metadata_group.command(name="migrate") +@click.pass_obj +def migrate_metadata_cmd(ctx): + exitcode, message = logic_metadata.migrate(ctx.env.metadata) + if exitcode != ExitCode.SUCCESS: + raise click.ClickException(message) + click.echo(message) + +# ##################### METRICS ################### + + @cli.group(name="metrics") @click.pass_obj def metrics_group(ctx): diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 4b4eff14e..3b5cfb8aa 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -66,7 +66,7 @@ class Cluster: aws_secret_arn: Optional[str] = None auth_type: Optional[AuthMethod] = None auth_details: Optional[Dict[str, Any]] = None - allow_insecure: bool = None + allow_insecure: bool = False def __init__(self, config: Dict) -> None: logger.info(f"Initializing cluster with config: {config}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py index 2b24e3f7c..7603b5636 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -120,6 +120,9 @@ def migrate(self) -> CommandResult: except KeyError as e: raise ValueError(f"Missing required auth details for target cluster: {e}") + if self._target_cluster.allow_insecure: + command.append("--target-insecure") + if self._index_allowlist: command.extend(["--index-allowlist", ", ".join(self._index_allowlist)]) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 3c55d2504..94ddd15e4 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -3,7 +3,7 @@ import logging import subprocess from typing import Dict, Optional -from console_link.models.cluster import Cluster +from console_link.models.cluster import AuthMethod, Cluster from console_link.models.command_result import CommandResult from cerberus import Validator @@ -63,8 +63,19 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Option self.s3_repo_uri = config['s3_repo_uri'] self.s3_region = config['s3_region'] + if source_cluster.auth_type is not AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + assert target_cluster is not None + if target_cluster.auth_type is not AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + def create(self, *args, **kwargs) -> CommandResult: assert isinstance(self.target_cluster, Cluster) + + logger.warning(self.source_cluster.call_api("/_cat/plugins").text) + + return CommandResult(success=True, value="Snapshot created successfully") command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 7282f1a8c..c0c983ea9 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -1,15 +1,19 @@ source_cluster: - endpoint: "https://capture-proxy:9200" - allow_insecure: true - basic_auth: - username: "admin" - password: "admin" + endpoint: "http://opensearchsource:9200" + no_auth: + # endpoint: "http://capture-proxy:9200" + # allow_insecure: true + # no_auth: + #basic_auth: + #username: "admin" + #password: "admin" target_cluster: - endpoint: "https://opensearchtarget:9200" + endpoint: "http://opensearchtarget:9200" allow_insecure: true - basic_auth: - username: "admin" - password: "myStrongPassword123!" + no_auth: + # basic_auth: + # username: "admin" + # password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" @@ -18,14 +22,8 @@ backfill: docker: snapshot: snapshot_name: "snapshot_2023_01_01" - s3_repo_uri: "s3://my-snapshot-bucket" - s3_region: "us-east-2" + s3_repo_uri: "s3://os-demo-us-east-1-snapshots/2024-06-20-snapshot-test-0001/" + s3_region: "us-east-1" metadata_migration: from_snapshot: min_replicas: 0 - index_allowlist: - - "my_index" - index_template_allowlist: - - "my_index_template" - component-template-allowlist: - - "my_component_template" From 77c52e36e476d3d530b39f82d5e72db963c9bc7e Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 14:22:20 -0600 Subject: [PATCH 04/20] Support local snapshots from CLI Signed-off-by: Mikayla Thompson --- .../src/main/java/com/rfs/CreateSnapshot.java | 42 +++++++++++++------ .../main/java/com/rfs/MetadataMigration.java | 34 +++++++++++---- .../java/com/rfs/common/FileSystemRepo.java | 4 ++ 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index dd380c4f5..27b77e022 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -2,17 +2,17 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; -import com.rfs.common.UsernamePassword; +import com.rfs.common.FileSystemSnapshotCreator; +import com.rfs.common.OpenSearchClient; +import com.rfs.common.S3SnapshotCreator; +import com.rfs.common.SnapshotCreator; +import com.rfs.common.TryHandlePhaseFailure; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import com.rfs.common.ConnectionDetails; -import com.rfs.common.OpenSearchClient; -import com.rfs.common.SnapshotCreator; -import com.rfs.common.TryHandlePhaseFailure; -import com.rfs.common.S3SnapshotCreator; import com.rfs.worker.SnapshotRunner; import java.util.Optional; @@ -26,13 +26,18 @@ public static class Args { description = "The name of the snapshot to migrate") public String snapshotName; + @Parameter(names = {"--file-system-repo-path"}, + required = false, + description = "The full path to the snapshot repo on the file system.") + public String fileSystemRepoPath; + @Parameter(names = {"--s3-repo-uri"}, - required = true, + required = false, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2") public String s3RepoUri; @Parameter(names = {"--s3-region"}, - required = true, + required = false, description = "The AWS Region the S3 bucket is in, like: us-east-2" ) public String s3Region; @@ -70,12 +75,25 @@ public static void main(String[] args) throws Exception { .build() .parse(args); - log.info("Running CreateSnapshot with " + String.join(" ", args)); - run(c -> new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region), - new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure)); + if (arguments.fileSystemRepoPath == null && arguments.s3RepoUri == null) { + throw new ParameterException("Either file-system-repo-path or s3-repo-uri must be set"); + } + if (arguments.fileSystemRepoPath != null && arguments.s3RepoUri != null) { + throw new ParameterException("Only one of file-system-repo-path and s3-repo-uri can be set"); + } + if (arguments.s3RepoUri != null && arguments.s3Region == null) { + throw new ParameterException("If an s3 repo is being used, s3-region must be set"); + } + + log.info("Running CreateSnapshot with {}", String.join(" ", args)); + run(c -> ((arguments.fileSystemRepoPath != null) + ? new FileSystemSnapshotCreator(arguments.snapshotName, c, arguments.fileSystemRepoPath) + : new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region)), + new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure) + ); } - public static void run(Function snapshotCreatorFactory, + public static void run(Function snapshotCreatorFactory, OpenSearchClient openSearchClient) throws Exception { TryHandlePhaseFailure.executeWithTryCatch(() -> { diff --git a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java index 9f9d79540..361efc6f4 100644 --- a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java +++ b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java @@ -2,6 +2,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; import java.nio.file.Path; import java.nio.file.Paths; @@ -9,14 +10,15 @@ import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; +import com.rfs.common.FileSystemRepo; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; import com.rfs.common.OpenSearchClient; -import com.rfs.common.S3Uri; import com.rfs.common.S3Repo; +import com.rfs.common.S3Uri; +import com.rfs.common.SnapshotRepo; import com.rfs.common.SourceRepo; import com.rfs.common.TryHandlePhaseFailure; -import com.rfs.common.SnapshotRepo; import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; @@ -35,13 +37,16 @@ public static class Args { @Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true) public String snapshotName; - @Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = true) + @Parameter(names = {"--file-system-repo-path"}, required = false, description = "The full path to the snapshot repo on the file system.") + public String fileSystemRepoPath; + + @Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = false) public String s3LocalDirPath; - @Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true) + @Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = false) public String s3RepoUri; - @Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true) + @Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = false) public String s3Region; @Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true) @@ -80,8 +85,19 @@ public static void main(String[] args) throws Exception { .build() .parse(args); + if (arguments.fileSystemRepoPath == null && arguments.s3RepoUri == null) { + throw new ParameterException("Either file-system-repo-path or s3-repo-uri must be set"); + } + if (arguments.fileSystemRepoPath != null && arguments.s3RepoUri != null) { + throw new ParameterException("Only one of file-system-repo-path and s3-repo-uri can be set"); + } + if ((arguments.s3RepoUri != null) && (arguments.s3Region == null || arguments.s3LocalDirPath == null)) { + throw new ParameterException("If an s3 repo is being used, s3-region and s3-local-dir-path must be set"); + } + final String snapshotName = arguments.snapshotName; - final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); + final Path fileSystemRepoPath = arguments.fileSystemRepoPath != null ? Paths.get(arguments.fileSystemRepoPath): null; + final Path s3LocalDirPath = arguments.s3LocalDirPath != null ? Paths.get(arguments.s3LocalDirPath) : null; final String s3RepoUri = arguments.s3RepoUri; final String s3Region = arguments.s3Region; final String targetHost = arguments.targetHost; @@ -93,11 +109,15 @@ public static void main(String[] args) throws Exception { final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + + TryHandlePhaseFailure.executeWithTryCatch(() -> { log.info("Running RfsWorker"); OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); + final SourceRepo sourceRepo = fileSystemRepoPath != null + ? new FileSystemRepo(fileSystemRepoPath) + : S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); final GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); final GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); diff --git a/RFS/src/main/java/com/rfs/common/FileSystemRepo.java b/RFS/src/main/java/com/rfs/common/FileSystemRepo.java index 54aaa0a3e..80bfbec2e 100644 --- a/RFS/src/main/java/com/rfs/common/FileSystemRepo.java +++ b/RFS/src/main/java/com/rfs/common/FileSystemRepo.java @@ -1,5 +1,9 @@ package com.rfs.common; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; + import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; From 8b59c29f258edfca1dc7510434b7bd2241d5927f Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 14:30:05 -0600 Subject: [PATCH 05/20] Put the imports back where they were Signed-off-by: Mikayla Thompson --- CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index 27b77e022..e7b9e6ec2 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -4,18 +4,17 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import com.rfs.common.FileSystemSnapshotCreator; import com.rfs.common.OpenSearchClient; import com.rfs.common.S3SnapshotCreator; import com.rfs.common.SnapshotCreator; import com.rfs.common.TryHandlePhaseFailure; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import com.rfs.worker.SnapshotRunner; -import java.util.Optional; import java.util.function.Function; @Slf4j From 519b845e44f3a8628f979ce4ed6872336195f849 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 14:46:37 -0600 Subject: [PATCH 06/20] Merge support-file-system-snapshots into branch Signed-off-by: Mikayla Thompson --- .../src/main/docker/docker-compose.yml | 20 +++++++++++++++++- .../lib/console_link/services.yaml | 21 ++++++++++--------- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 09ae73c12..72fe5eaeb 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -84,6 +84,7 @@ services: - discovery.type=single-node - plugins.security.disabled=true - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! + - plugins.security.disabled=true networks: - migrations ports: @@ -109,13 +110,28 @@ services: - sharedReplayerOutput:/shared-replayer-output - ./migrationConsole/lib/console_link/services.yaml:/etc/migration_services.yaml # this is a convenience thing for testing -- it should be removed before this makes it to prod. - # - ./migrationConsole/lib/console_link:/root/lib/console_link + - ./migrationConsole/lib/console_link:/root/lib/console_link - ~/.aws:/root/.aws + - snapshot:/snapshot environment: - MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092 - AWS_PROFILE=default # command: ./runTestBenchmarks.sh + opensearchsource: + image: 'opensearchproject/opensearch:1' + volumes: + - snapshot:/snapshot + environment: + - discovery.type=single-node + - plugins.security.disabled=true + - path.repo=/snapshot + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! + - ~/.aws:/root/.aws + networks: + - migrations + ports: + - "39200:9200" volumes: sharedComparatorSqlResults: @@ -124,6 +140,8 @@ volumes: driver: local grafana_data: driver: local + snapshot: + driver: local networks: migrations: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index c0c983ea9..4b12ee2ea 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -1,19 +1,17 @@ source_cluster: endpoint: "http://opensearchsource:9200" no_auth: - # endpoint: "http://capture-proxy:9200" # allow_insecure: true - # no_auth: - #basic_auth: - #username: "admin" - #password: "admin" + # basic_auth: + # username: "admin" + # password: "admin" target_cluster: endpoint: "http://opensearchtarget:9200" - allow_insecure: true no_auth: + # allow_insecure: true # basic_auth: - # username: "admin" - # password: "myStrongPassword123!" + # username: "admin" + # password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" @@ -22,8 +20,11 @@ backfill: docker: snapshot: snapshot_name: "snapshot_2023_01_01" - s3_repo_uri: "s3://os-demo-us-east-1-snapshots/2024-06-20-snapshot-test-0001/" - s3_region: "us-east-1" + s3: + repo_uri: "s3://os-demo-us-east-1-snapshots/2024-06-20-snapshot-test-0001/" + aws_region: "us-east-1" + fs: + repo_path: "/snapshot/test" metadata_migration: from_snapshot: min_replicas: 0 From cc82c4950ab6272ff1e00b5c760220811868e11a Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 14:50:15 -0600 Subject: [PATCH 07/20] Make changes for filesystem snapshots Signed-off-by: Mikayla Thompson --- .../console_link/console_link/environment.py | 18 ++- .../console_link/models/snapshot.py | 107 +++++++++++++----- 2 files changed, 88 insertions(+), 37 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index ad7cfc4a6..50468b918 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -1,19 +1,25 @@ import logging -from typing import Optional +from typing import Optional, Dict from console_link.models.cluster import Cluster from console_link.models.metrics_source import MetricsSource from console_link.logic.metrics import get_metrics_source from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill -from console_link.models.snapshot import Snapshot, S3Snapshot +from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot import yaml from cerberus import Validator from console_link.models.metadata import Metadata - logger = logging.getLogger(__name__) + +def get_snapshot(config: Dict, source_cluster: Cluster, target_cluster: Cluster): + if 'fs' in config: + return FileSystemSnapshot(config, source_cluster, target_cluster) + return S3Snapshot(config, source_cluster, target_cluster) + + SCHEMA = { "source_cluster": {"type": "dict", "required": False}, "target_cluster": {"type": "dict", "required": True}, @@ -70,9 +76,9 @@ def __init__(self, config_file: str): logger.info("No backfill provided") if 'snapshot' in self.config: - self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], - source_cluster=self.source_cluster, - target_cluster=self.target_cluster) + self.snapshot: Snapshot = get_snapshot(self.config["snapshot"], + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 94ddd15e4..004ef687b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -7,6 +7,8 @@ from console_link.models.command_result import CommandResult from cerberus import Validator +from console_link.models.schema_tools import contains_one_of + logger = logging.getLogger(__name__) SnapshotStatus = Enum( @@ -17,6 +19,30 @@ "FAILED"]) +SNAPSHOT_SCHEMA = { + 'snapshot': { + 'type': 'dict', + 'schema': { + 'snapshot_name': {'type': 'string', 'required': True}, + 's3': { + 'type': 'dict', + 'schema': { + 'repo_uri': {'type': 'string', 'required': True}, + 'aws_region': {'type': 'string', 'required': True}, + } + }, + 'fs': { + 'type': 'dict', + 'schema': { + 'repo_path': {'type': 'string', 'required': True}, + } + } + }, + 'check_with': contains_one_of({'s3', 'fs'}) + } +} + + class Snapshot(ABC): """ Interface for creating and managing snapshots. @@ -25,6 +51,9 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Option self.config = config self.source_cluster = source_cluster self.target_cluster = target_cluster + v = Validator(SNAPSHOT_SCHEMA) + if not v.validate({'snapshot': config}): + raise ValueError("Invalid config file for snapshot", v.errors) @abstractmethod def create(self, *args, **kwargs) -> CommandResult: @@ -37,45 +66,22 @@ def status(self, *args, **kwargs) -> CommandResult: pass -S3_SNAPSHOT_SCHEMA = { - 'snapshot_name': { - 'type': 'string', - 'required': True - }, - 's3_repo_uri': { - 'type': 'string', - 'required': True - }, - 's3_region': { - 'type': 'string', - 'required': True - } -} - - class S3Snapshot(Snapshot): - def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: super().__init__(config, source_cluster, target_cluster) - v = Validator(S3_SNAPSHOT_SCHEMA) - if not v.validate(config): - raise ValueError("Invalid config file for snapshot", v.errors) - self.snapshot_name = config['snapshot_name'] - self.s3_repo_uri = config['s3_repo_uri'] - self.s3_region = config['s3_region'] - if source_cluster.auth_type is not AuthMethod.NO_AUTH: - raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") - - assert target_cluster is not None - if target_cluster.auth_type is not AuthMethod.NO_AUTH: - raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + self.snapshot_name = config['snapshot_name'] + self.s3_repo_uri = config['s3']['repo_uri'] + self.s3_region = config['s3']['aws_region'] def create(self, *args, **kwargs) -> CommandResult: assert isinstance(self.target_cluster, Cluster) + if self.source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") - logger.warning(self.source_cluster.call_api("/_cat/plugins").text) + if self.target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") - return CommandResult(success=True, value="Snapshot created successfully") command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, @@ -102,3 +108,42 @@ def create(self, *args, **kwargs) -> CommandResult: def status(self, *args, **kwargs) -> CommandResult: return CommandResult(success=False, value="Command not implemented") + + +class FileSystemSnapshot(Snapshot): + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: + super().__init__(config, source_cluster, target_cluster) + self.snapshot_name = config['snapshot_name'] + self.repo_path = config['fs']['repo_path'] + + if source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + if target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + + def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + + command = [ + "/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", self.snapshot_name, + "--file-system-repo-path", self.repo_path, + "--source-host", self.source_cluster.endpoint, + "--target-host", self.target_cluster.endpoint, + ] + + if self.source_cluster.allow_insecure: + command.append("--source-insecure") + if self.target_cluster.allow_insecure: + command.append("--target-insecure") + + logger.info(f"Creating snapshot with command: {' '.join(command)}") + try: + # Pass None to stdout and stderr to not capture output and show in terminal + subprocess.run(command, stdout=None, stderr=None, text=True, check=True) + logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") + return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") From 6a6e7a84d7d47b74d077f385def874afc7a3e3b8 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Fri, 21 Jun 2024 14:50:15 -0600 Subject: [PATCH 08/20] Make changes for filesystem snapshots Signed-off-by: Mikayla Thompson --- .../console_link/console_link/environment.py | 17 ++- .../console_link/models/snapshot.py | 103 ++++++++++++++---- 2 files changed, 92 insertions(+), 28 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index d3b83b470..aa02e7397 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -1,17 +1,24 @@ import logging -from typing import Optional +from typing import Optional, Dict from console_link.models.cluster import Cluster from console_link.models.metrics_source import MetricsSource from console_link.logic.metrics import get_metrics_source from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill -from console_link.models.snapshot import Snapshot, S3Snapshot +from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot import yaml from cerberus import Validator logger = logging.getLogger(__name__) + +def get_snapshot(config: Dict, source_cluster: Cluster, target_cluster: Cluster): + if 'fs' in config: + return FileSystemSnapshot(config, source_cluster, target_cluster) + return S3Snapshot(config, source_cluster, target_cluster) + + SCHEMA = { "source_cluster": {"type": "dict", "required": False}, "target_cluster": {"type": "dict", "required": True}, @@ -66,9 +73,9 @@ def __init__(self, config_file: str): logger.info("No backfill provided") if 'snapshot' in self.config: - self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], - source_cluster=self.source_cluster, - target_cluster=self.target_cluster) + self.snapshot: Snapshot = get_snapshot(self.config["snapshot"], + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 690ad7f7b..004ef687b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -3,10 +3,12 @@ import logging import subprocess from typing import Dict, Optional -from console_link.models.cluster import Cluster +from console_link.models.cluster import AuthMethod, Cluster from console_link.models.command_result import CommandResult from cerberus import Validator +from console_link.models.schema_tools import contains_one_of + logger = logging.getLogger(__name__) SnapshotStatus = Enum( @@ -17,6 +19,30 @@ "FAILED"]) +SNAPSHOT_SCHEMA = { + 'snapshot': { + 'type': 'dict', + 'schema': { + 'snapshot_name': {'type': 'string', 'required': True}, + 's3': { + 'type': 'dict', + 'schema': { + 'repo_uri': {'type': 'string', 'required': True}, + 'aws_region': {'type': 'string', 'required': True}, + } + }, + 'fs': { + 'type': 'dict', + 'schema': { + 'repo_path': {'type': 'string', 'required': True}, + } + } + }, + 'check_with': contains_one_of({'s3', 'fs'}) + } +} + + class Snapshot(ABC): """ Interface for creating and managing snapshots. @@ -25,6 +51,9 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Option self.config = config self.source_cluster = source_cluster self.target_cluster = target_cluster + v = Validator(SNAPSHOT_SCHEMA) + if not v.validate({'snapshot': config}): + raise ValueError("Invalid config file for snapshot", v.errors) @abstractmethod def create(self, *args, **kwargs) -> CommandResult: @@ -37,33 +66,22 @@ def status(self, *args, **kwargs) -> CommandResult: pass -S3_SNAPSHOT_SCHEMA = { - 'snapshot_name': { - 'type': 'string', - 'required': True - }, - 's3_repo_uri': { - 'type': 'string', - 'required': True - }, - 's3_region': { - 'type': 'string', - 'required': True - } -} - - class S3Snapshot(Snapshot): - def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: super().__init__(config, source_cluster, target_cluster) - v = Validator(S3_SNAPSHOT_SCHEMA) - if not v.validate(config): - raise ValueError("Invalid config file for snapshot", v.errors) + self.snapshot_name = config['snapshot_name'] - self.s3_repo_uri = config['s3_repo_uri'] - self.s3_region = config['s3_region'] + self.s3_repo_uri = config['s3']['repo_uri'] + self.s3_region = config['s3']['aws_region'] def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + if self.source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + if self.target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, @@ -90,3 +108,42 @@ def create(self, *args, **kwargs) -> CommandResult: def status(self, *args, **kwargs) -> CommandResult: return CommandResult(success=False, value="Command not implemented") + + +class FileSystemSnapshot(Snapshot): + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: + super().__init__(config, source_cluster, target_cluster) + self.snapshot_name = config['snapshot_name'] + self.repo_path = config['fs']['repo_path'] + + if source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + if target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + + def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + + command = [ + "/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", self.snapshot_name, + "--file-system-repo-path", self.repo_path, + "--source-host", self.source_cluster.endpoint, + "--target-host", self.target_cluster.endpoint, + ] + + if self.source_cluster.allow_insecure: + command.append("--source-insecure") + if self.target_cluster.allow_insecure: + command.append("--target-insecure") + + logger.info(f"Creating snapshot with command: {' '.join(command)}") + try: + # Pass None to stdout and stderr to not capture output and show in terminal + subprocess.run(command, stdout=None, stderr=None, text=True, check=True) + logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") + return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") From 5ef53021551c544b1e1d5ead3abb48e106d3eb16 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Sat, 22 Jun 2024 12:35:02 -0600 Subject: [PATCH 09/20] fix up services.yaml Signed-off-by: Mikayla Thompson --- .../lib/console_link/services.yaml | 17 +++++++++-------- .../lib/console_link/tests/data/services.yaml | 19 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 521992379..0ac9e0666 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -2,21 +2,22 @@ source_cluster: endpoint: "https://capture-proxy:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: snapshot_name: "snapshot_2023_01_01" - s3_repo_uri: "s3://my-snapshot-bucket" - s3_region: "us-east-2" + s3: + repo_uri: "s3://my-snapshot-bucket" + aws_region: "us-east-2" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml index abca9d89a..caa08cafe 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml @@ -2,21 +2,22 @@ source_cluster: endpoint: "https://capture-proxy-es:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: - snapshot_name: "test_snapshot" - s3_repo_uri: "s3://test-bucket" - s3_region: "us-east-2" + snapshot_name: "snapshot_2023_01_01" + s3: + repo_uri: "s3://my-snapshot-bucket" + aws_region: "us-east-2" From b37654323a20e883405e8684cbe6c2f17accbcca Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Sun, 23 Jun 2024 22:35:57 -0600 Subject: [PATCH 10/20] Add tests Signed-off-by: Mikayla Thompson --- .../console_link/models/snapshot.py | 3 + .../lib/console_link/tests/test_snapshot.py | 60 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 004ef687b..d16f03cf3 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -147,3 +147,6 @@ def create(self, *args, **kwargs) -> CommandResult: except subprocess.CalledProcessError as e: logger.error(f"Failed to create snapshot: {str(e)}") return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") + + def status(self, *args, **kwargs) -> CommandResult: + return CommandResult(success=False, value="Command not implemented") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py new file mode 100644 index 000000000..f446ef547 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -0,0 +1,60 @@ +from console_link.models.snapshot import S3Snapshot, FileSystemSnapshot, Snapshot +from console_link.environment import get_snapshot +from console_link.models.cluster import AuthMethod +from tests.utils import create_valid_cluster + + +def test_s3_snapshot_init_succeeds(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = S3Snapshot(config['snapshot'], create_valid_cluster(), None) + assert isinstance(snapshot, Snapshot) + + +def test_fs_snapshot_init_succeeds(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + snapshot = FileSystemSnapshot(config["snapshot"], create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + assert isinstance(snapshot, Snapshot) + + +def test_get_snapshot_for_s3_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = get_snapshot(config["snapshot"], create_valid_cluster(), None) + assert isinstance(snapshot, S3Snapshot) + + +def test_get_snapshot_for_fs_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + snapshot = get_snapshot(config["snapshot"], create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + assert isinstance(snapshot, FileSystemSnapshot) From a75c5f6258d5f37fd5fa876cff4eddd12a067fa5 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Sun, 23 Jun 2024 22:45:33 -0600 Subject: [PATCH 11/20] remove target-host parameter Signed-off-by: Mikayla Thompson --- .../lib/console_link/console_link/models/snapshot.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index d16f03cf3..c474802fa 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -88,7 +88,6 @@ def create(self, *args, **kwargs) -> CommandResult: "--s3-repo-uri", self.s3_repo_uri, "--s3-region", self.s3_region, "--source-host", self.source_cluster.endpoint, - "--target-host", self.target_cluster.endpoint, ] if self.source_cluster.allow_insecure: @@ -130,7 +129,6 @@ def create(self, *args, **kwargs) -> CommandResult: "--snapshot-name", self.snapshot_name, "--file-system-repo-path", self.repo_path, "--source-host", self.source_cluster.endpoint, - "--target-host", self.target_cluster.endpoint, ] if self.source_cluster.allow_insecure: From 2f2ca1b582a735fbc1e9896d6082178658205d40 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Sun, 23 Jun 2024 23:05:25 -0600 Subject: [PATCH 12/20] Update cdk Signed-off-by: Mikayla Thompson --- .../lib/migration-services-yaml.ts | 21 ++++++++++++---- .../reindex-from-snapshot-stack.ts | 9 ++++--- .../test/migration-services-yaml.test.ts | 24 +++++++++++++++++-- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts index 514834ddd..4d01c91e6 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts @@ -42,16 +42,27 @@ export class OSIBackfillYaml { } } +export class FileSystemSnapshotYaml { + repo_path: string = ''; +} + export class S3SnapshotYaml { + repo_uri: string = ''; + aws_region: string = ''; +} + +export class SnapshotYaml { snapshot_name: string = ''; - s3_repo_uri: string = ''; - s3_region: string = ''; + s3?: S3SnapshotYaml; + fs?: FileSystemSnapshotYaml; toDict() { return { snapshot_name: this.snapshot_name, - s3_repo_uri: this.s3_repo_uri, - s3_region: this.s3_region + // This conditinally includes the s3 and fs parameters if they're defined, + // but does not add the keys otherwise + ...(this.s3 && { s3: this.s3 }), + ...(this.fs && { fs: this.fs }) }; } } @@ -61,7 +72,7 @@ export class ServicesYaml { target_cluster: ClusterYaml; metrics_source: MetricsSourceYaml = new MetricsSourceYaml(); backfill: RFSBackfillYaml | OSIBackfillYaml; - snapshot?: S3SnapshotYaml; + snapshot?: SnapshotYaml; stringify(): string { return yaml.stringify({ diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts index 5b3627ac1..e144825fc 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts @@ -11,7 +11,7 @@ import { createOpenSearchServerlessIAMAccessPolicy, getMigrationStringParameterValue } from "../common-utilities"; -import { RFSBackfillYaml, S3SnapshotYaml } from "../migration-services-yaml"; +import { RFSBackfillYaml, SnapshotYaml } from "../migration-services-yaml"; export interface ReindexFromSnapshotProps extends StackPropsExt { @@ -23,7 +23,7 @@ export interface ReindexFromSnapshotProps extends StackPropsExt { export class ReindexFromSnapshotStack extends MigrationServiceCore { rfsBackfillYaml: RFSBackfillYaml; - rfsSnapshotYaml: S3SnapshotYaml; + rfsSnapshotYaml: SnapshotYaml; constructor(scope: Construct, id: string, props: ReindexFromSnapshotProps) { super(scope, id, props) @@ -85,9 +85,8 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { this.rfsBackfillYaml = new RFSBackfillYaml(); this.rfsBackfillYaml.ecs.cluster_name = `migration-${props.stage}-ecs-cluster`; this.rfsBackfillYaml.ecs.service_name = `migration-${props.stage}-reindex-from-snapshot`; - this.rfsSnapshotYaml = new S3SnapshotYaml(); - this.rfsSnapshotYaml.s3_repo_uri = s3Uri; - this.rfsSnapshotYaml.s3_region = this.region; + this.rfsSnapshotYaml = new SnapshotYaml(); + this.rfsSnapshotYaml.s3 = {repo_uri: s3Uri, aws_region: this.region}; this.rfsSnapshotYaml.snapshot_name = "rfs-snapshot"; } diff --git a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts index 2f7a38f91..0eb70d25f 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts @@ -1,4 +1,4 @@ -import { ClusterYaml, RFSBackfillYaml, ServicesYaml } from "../lib/migration-services-yaml" +import { ClusterYaml, RFSBackfillYaml, ServicesYaml, SnapshotYaml } from "../lib/migration-services-yaml" test('Test default servicesYaml can be stringified', () => { const servicesYaml = new ServicesYaml(); @@ -57,4 +57,24 @@ test('Test servicesYaml without backfill does not include backend section', () = let servicesYaml = new ServicesYaml(); const yaml = servicesYaml.stringify(); expect(yaml).toBe(`metrics_source:\n cloudwatch:\n`); -}) \ No newline at end of file +}) + +test('Test SnapshotYaml for filesystem only includes fs', () => { + let fsSnapshot = new SnapshotYaml(); + fsSnapshot.fs = {"repo_path": "/path/to/shared/volume"} + const fsSnapshotDict = fsSnapshot.toDict() + expect(fsSnapshotDict).toBeDefined(); + expect(fsSnapshotDict).toHaveProperty("fs"); + expect(fsSnapshotDict["fs"]).toHaveProperty("repo_path"); + expect(fsSnapshotDict).not.toHaveProperty("s3"); +}) + +test('Test SnapshotYaml for s3 only includes s3', () => { + let s3Snapshot = new SnapshotYaml(); + s3Snapshot.s3 = {"repo_uri": "s3://repo/path", "aws_region": "us-east-1"} + const s3SnapshotDict = s3Snapshot.toDict() + expect(s3SnapshotDict).toBeDefined(); + expect(s3SnapshotDict).toHaveProperty("s3"); + expect(s3SnapshotDict["s3"]).toHaveProperty("repo_uri"); + expect(s3SnapshotDict).not.toHaveProperty("fs"); +}) From 293ab14c0b432a66978b8ce087211ae551cf6826 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Sun, 23 Jun 2024 23:23:22 -0600 Subject: [PATCH 13/20] Add a few more snapshot tests Signed-off-by: Mikayla Thompson --- .../lib/console_link/tests/test_snapshot.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py index f446ef547..ddaec0a7e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -2,6 +2,7 @@ from console_link.environment import get_snapshot from console_link.models.cluster import AuthMethod from tests.utils import create_valid_cluster +import pytest def test_s3_snapshot_init_succeeds(): @@ -58,3 +59,121 @@ def test_get_snapshot_for_fs_config(): snapshot = get_snapshot(config["snapshot"], create_valid_cluster(auth_type=AuthMethod.NO_AUTH), create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) assert isinstance(snapshot, FileSystemSnapshot) + + +def test_get_snapshot_fails_for_invalid_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "invalid": { + "key": "value" + }, + } + } + with pytest.raises(ValueError) as excinfo: + get_snapshot(config["snapshot"], create_valid_cluster(), create_valid_cluster()) + assert "Invalid config file for snapshot" in str(excinfo.value.args[0]) + + +def test_get_snpashot_fails_for_config_with_fs_and_s3(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + with pytest.raises(ValueError) as excinfo: + get_snapshot(config["snapshot"], create_valid_cluster(), create_valid_cluster()) + assert "Invalid config file for snapshot" in str(excinfo.value.args[0]) + + +def test_fs_snapshot_create_calls_subprocess_run_with_correct_args(mocker): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + source = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + snapshot = FileSystemSnapshot(config["snapshot"], source, + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + mock = mocker.patch("subprocess.run") + snapshot.create() + + mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", config["snapshot"]["snapshot_name"], + "--file-system-repo-path", config["snapshot"]["fs"]["repo_path"], + "--source-host", source.endpoint, + "--source-insecure", "--target-insecure"], + stdout=None, stderr=None, text=True, check=True) + + +def test_s3_snapshot_create_calls_subprocess_run_with_correct_args(mocker): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + source = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + snapshot = S3Snapshot(config["snapshot"], source, create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + mock = mocker.patch("subprocess.run") + snapshot.create() + + mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", config["snapshot"]["snapshot_name"], + "--s3-repo-uri", config["snapshot"]["s3"]["repo_uri"], + "--s3-region", config["snapshot"]["s3"]["aws_region"], + "--source-host", source.endpoint, + "--source-insecure", "--target-insecure"], + stdout=None, stderr=None, text=True, check=True) + + +@pytest.mark.parametrize("source_auth,target_auth", [(AuthMethod.NO_AUTH, AuthMethod.BASIC_AUTH), + (AuthMethod.BASIC_AUTH, AuthMethod.NO_AUTH)]) +def test_s3_snapshot_create_fails_for_clusters_with_auth(source_auth, target_auth): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = S3Snapshot(config["snapshot"], create_valid_cluster(auth_type=source_auth), + create_valid_cluster(auth_type=target_auth)) + with pytest.raises(NotImplementedError) as excinfo: + snapshot.create() + assert "authentication is not supported" in str(excinfo.value.args[0]) + + +@pytest.mark.parametrize("source_auth,target_auth", [(AuthMethod.NO_AUTH, AuthMethod.BASIC_AUTH), + (AuthMethod.BASIC_AUTH, AuthMethod.NO_AUTH)]) +def test_fs_snapshot_create_fails_for_clusters_with_auth(source_auth, target_auth): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/to/repo" + }, + } + } + with pytest.raises(NotImplementedError) as excinfo: + snapshot = FileSystemSnapshot(config["snapshot"], create_valid_cluster(auth_type=source_auth), + create_valid_cluster(auth_type=target_auth)) + snapshot.create() + assert "authentication is not supported" in str(excinfo.value.args[0]) From 6364600b2237ffaa59d00860debaf734e18e3b80 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Mon, 24 Jun 2024 08:36:01 -0600 Subject: [PATCH 14/20] Add fs snapshot to service.yaml for docker Signed-off-by: Mikayla Thompson --- .../docker/migrationConsole/lib/console_link/services.yaml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 0ac9e0666..e9e901d3f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -17,7 +17,6 @@ backfill: reindex_from_snapshot: docker: snapshot: - snapshot_name: "snapshot_2023_01_01" - s3: - repo_uri: "s3://my-snapshot-bucket" - aws_region: "us-east-2" + snapshot_name: "snapshot_2024_06_21" + fs: + repo_path: "/snapshot/test-console" From 862e15ccaa92e96d9bb7607b604d70aa6c568024 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Mon, 24 Jun 2024 08:56:01 -0600 Subject: [PATCH 15/20] Move auth check to snapshot create instead of init Signed-off-by: Mikayla Thompson --- .../lib/console_link/console_link/models/snapshot.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index c474802fa..9b986cd3b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -115,15 +115,15 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluste self.snapshot_name = config['snapshot_name'] self.repo_path = config['fs']['repo_path'] - if source_cluster.auth_type != AuthMethod.NO_AUTH: + def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + + if self.source_cluster.auth_type != AuthMethod.NO_AUTH: raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") - if target_cluster.auth_type != AuthMethod.NO_AUTH: + if self.target_cluster.auth_type != AuthMethod.NO_AUTH: raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") - def create(self, *args, **kwargs) -> CommandResult: - assert isinstance(self.target_cluster, Cluster) - command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, From d56d1b71ef28becc71af8af9aba27b39d28342b1 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Mon, 24 Jun 2024 10:10:41 -0600 Subject: [PATCH 16/20] Get metadata working for fs snapshot Signed-off-by: Mikayla Thompson --- .../lib/console_link/console_link/cli.py | 4 +- .../console_link/console_link/environment.py | 2 + .../console_link/models/metadata.py | 100 +++++++++++------- .../lib/console_link/tests/test_metadata.py | 31 +++--- .../lib/console_link/tests/test_snapshot.py | 4 +- 5 files changed, 89 insertions(+), 52 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index a0c943115..87cf5aba0 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -37,10 +37,10 @@ def __init__(self, config_file) -> None: @click.option('-v', '--verbose', count=True, help="Verbosity level. Default is warn, -v is info, -vv is debug.") @click.pass_context def cli(ctx, config_file, json, verbose): - ctx.obj = Context(config_file) - ctx.obj.json = json logging.basicConfig(level=logging.WARN - (10 * verbose)) logger.info(f"Logging set to {logging.getLevelName(logger.getEffectiveLevel())}") + ctx.obj = Context(config_file) + ctx.obj.json = json # ##################### CLUSTERS ################### diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index 50468b918..27ac6c269 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -40,9 +40,11 @@ class Environment: metadata: Optional[Metadata] = None def __init__(self, config_file: str): + logger.info(f"Loading config file: {config_file}") self.config_file = config_file with open(self.config_file) as f: self.config = yaml.safe_load(f) + logger.info(f"Loaded config file: {self.config}") v = Validator(SCHEMA) if not v.validate(self.config): logger.error(f"Config file validation errors: {v.errors}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py index 7603b5636..2edeca3d1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -7,7 +7,7 @@ from console_link.models.command_result import CommandResult from console_link.models.schema_tools import list_schema from console_link.models.cluster import AuthMethod, Cluster -from console_link.models.snapshot import S3Snapshot, Snapshot +from console_link.models.snapshot import S3Snapshot, Snapshot, FileSystemSnapshot logger = logging.getLogger(__name__) @@ -22,16 +22,25 @@ "snapshot_name": {"type": "string", "required": True}, "local_dir": {"type": "string", "required": False}, "s3": { - "type": "dict", - # Again, in the future there should be a 'local' option, but for now this block is required if the - # snapshot details are being specified. - "required": True, - "schema": { - "repo_uri": {"type": "string", "required": True}, - "aws_region": {"type": "string", "required": True} + 'type': 'dict', + "required": False, + 'schema': { + 'repo_uri': {'type': 'string', 'required': True}, + 'aws_region': {'type': 'string', 'required': True}, } }, - } + "fs": { + 'type': 'dict', + "required": False, + 'schema': { + 'repo_path': {'type': 'string', 'required': True}, + } + } + }, + # We _should_ have the check below, but I need to figure out how to combine it with a potentially + # nullable block (like this one) + # 'check_with': contains_one_of({'s3', 'fs'}) + } SCHEMA = { @@ -49,52 +58,63 @@ def generate_tmp_dir(name: str) -> str: class Metadata: def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] = None): + logger.debug(f"Initializing Metadata with config: {config}") v = Validator(SCHEMA) if not v.validate(config): + logger.error(f"Invalid config: {v.errors}") raise ValueError(v.errors) self._config = config self._target_cluster = target_cluster - - if not isinstance(snapshot, S3Snapshot): - # We can't currently use the snapshot info unless it's in S3. We need to ignore it. - logger.info("Ignoring externally specified snapshot info since it's not in S3.") - snapshot = None + self._snapshot = snapshot if (not snapshot) and (config["from_snapshot"] is None): raise ValueError("No snapshot is specified or can be assumed " "for the metadata migration to use.") - + self._min_replicas = config.get("min_replicas", 0) self._index_allowlist = config.get("index_allowlist", None) self._index_template_allowlist = config.get("index_template_allowlist", None) self._component_template_allowlist = config.get("component-template-allowlist", None) + logger.debug(f"Min replicas: {self._min_replicas}") + logger.debug(f"Index allowlist: {self._index_allowlist}") + logger.debug(f"Index template allowlist: {self._index_template_allowlist}") + logger.debug(f"Component template allowlist: {self._component_template_allowlist}") # If `from_snapshot` is fully specified, use those values to define snapshot params if config["from_snapshot"] is not None: - logger.info("Defining snapshot params for metadata migration from config file") + logger.debug("Using fully specified snapshot config") + self._snapshot_location = 's3' if 's3' in config["from_snapshot"] else 'fs' self._snapshot_name = config["from_snapshot"]["snapshot_name"] - if "local_dir" in config["from_snapshot"]: - self._local_dir = config["from_snapshot"]["local_dir"] + + if self._snapshot_location == 'fs': + self._repo_path = config["from_snapshot"]["fs"]["repo_path"] else: - self._local_dir = generate_tmp_dir(self._snapshot_name) - self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] - self._aws_region = config["from_snapshot"]["s3"]["aws_region"] + self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] + self._aws_region = config["from_snapshot"]["s3"]["aws_region"] else: - assert snapshot is not None # This follows from the logic above, but I'm asserting it to make mypy happy - logger.info("Defining snapshot params for metadata migration from Snapshot object") + logger.debug("Using independently specified snapshot") + assert isinstance(snapshot, Snapshot) # This is assumable from logic, but specifying it for mypy self._snapshot_name = snapshot.snapshot_name + if isinstance(snapshot, S3Snapshot): + self._snapshot_location = "s3" + self._s3_uri = snapshot.s3_repo_uri + self._aws_region = snapshot.s3_region + elif isinstance(snapshot, FileSystemSnapshot): + self._snapshot_location = "fs" + self._repo_path = snapshot.repo_path + + if config["from_snapshot"] is not None and "local_dir" in config["from_snapshot"]: + self._local_dir = config["from_snapshot"]["local_dir"] + else: self._local_dir = generate_tmp_dir(self._snapshot_name) - self._s3_uri = snapshot.s3_repo_uri - self._aws_region = snapshot.s3_region - + logger.debug(f"Snapshot name: {self._snapshot_name}") - logger.debug(f"Local dir: {self._local_dir}") - logger.debug(f"S3 URI: {self._s3_uri}") - logger.debug(f"AWS region: {self._aws_region}") - logger.debug(f"Min replicas: {self._min_replicas}") - logger.debug(f"Index allowlist: {self._index_allowlist}") - logger.debug(f"Index template allowlist: {self._index_template_allowlist}") - logger.debug(f"Component template allowlist: {self._component_template_allowlist}") + if self._snapshot_location == 's3': + logger.debug(f"S3 URI: {self._s3_uri}") + logger.debug(f"AWS region: {self._aws_region}") + else: + logger.debug(f"Local dir: {self._local_dir}") + logger.info("Metadata migration configuration defined") def migrate(self) -> CommandResult: @@ -103,12 +123,20 @@ def migrate(self) -> CommandResult: "/root/metadataMigration/bin/MetadataMigration", # Initially populate only the required params "--snapshot-name", self._snapshot_name, - "--s3-local-dir", self._local_dir, - "--s3-repo-uri", self._s3_uri, - "--s3-region", self._aws_region, "--target-host", self._target_cluster.endpoint, "--min-replicas", str(self._min_replicas) ] + if self._snapshot_location == 's3': + command.extend([ + "--s3-local-dir", self._local_dir, + "--s3-repo-uri", self._s3_uri, + "--s3-region", self._aws_region, + ]) + elif self._snapshot_location == 'fs': + command.extend([ + "--file-system-repo-path", self._repo_path, + ]) + if self._target_cluster.auth_details == AuthMethod.BASIC_AUTH: try: command.extend([ diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py index 8b1db9e6d..e0dd43971 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py @@ -76,11 +76,13 @@ def test_metadata_init_with_partial_snapshot_config_no_external_snapshot_fails() def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(): snapshot_config = { - "snapshot_name": "test_snapshot", - "s3_repo_uri": "s3://test-bucket", - "s3_region": "us-east-2" + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + } } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) config = { "from_snapshot": None, } @@ -90,13 +92,16 @@ def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(): assert isinstance(metadata, Metadata) +@pytest.mark.skip("Need to tighten up the schema specification to catch this case") def test_metadata_init_with_partial_config_and_external_snapshot_fails(): snapshot_config = { - "snapshot_name": "test_snapshot", - "s3_repo_uri": "s3://test-bucket", - "s3_region": "us-east-2" + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + } } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) config = { "from_snapshot": { "local_dir": "/tmp/s3", @@ -112,11 +117,13 @@ def test_metadata_init_with_partial_config_and_external_snapshot_fails(): def test_full_config_and_snapshot_gives_priority_to_config(): snapshot_config = { - "snapshot_name": "test_snapshot", - "s3_repo_uri": "s3://test-bucket", - "s3_region": "us-east-2" + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + } } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster) + snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) config = { "from_snapshot": { "local_dir": "/tmp/s3", diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py index ddaec0a7e..5e8fe63d9 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -15,7 +15,7 @@ def test_s3_snapshot_init_succeeds(): }, } } - snapshot = S3Snapshot(config['snapshot'], create_valid_cluster(), None) + snapshot = S3Snapshot(config['snapshot'], create_valid_cluster(), create_valid_cluster()) assert isinstance(snapshot, Snapshot) @@ -43,7 +43,7 @@ def test_get_snapshot_for_s3_config(): }, } } - snapshot = get_snapshot(config["snapshot"], create_valid_cluster(), None) + snapshot = get_snapshot(config["snapshot"], create_valid_cluster(), create_valid_cluster()) assert isinstance(snapshot, S3Snapshot) From c790c5216f83d4d30b760da6e5825944c310e789 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Mon, 24 Jun 2024 16:05:11 -0600 Subject: [PATCH 17/20] Add detached metadata migration, simplify logic Signed-off-by: Mikayla Thompson --- FetchMigration/Dockerfile | 2 +- .../main/docker/migrationConsole/Dockerfile | 4 +- .../lib/console_link/console_link/cli.py | 5 +- .../console_link/logic/metadata.py | 9 ++- .../console_link/models/cluster.py | 1 + .../console_link/models/metadata.py | 69 +++++++++++++------ .../console_link/console_link/models/utils.py | 6 ++ 7 files changed, 68 insertions(+), 28 deletions(-) diff --git a/FetchMigration/Dockerfile b/FetchMigration/Dockerfile index f0399b9c2..41b3fb121 100644 --- a/FetchMigration/Dockerfile +++ b/FetchMigration/Dockerfile @@ -5,7 +5,7 @@ COPY python/Pipfile . RUN apt -y update RUN apt -y install python3 python3-pip RUN pip3 install pipenv -RUN pipenv install --system --deploy --ignore-pipfile +RUN pipenv install --deploy --ignore-pipfile ENV FM_CODE_PATH /code diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index f3614031b..09bae3f3b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -38,12 +38,12 @@ RUN chmod ug+x /root/loadServicesFromParameterStore.sh COPY lib /root/lib WORKDIR /root/lib/console_link -RUN pipenv install --system --deploy --ignore-pipfile +RUN pipenv install --deploy --ignore-pipfile # Experimental console API, not started by default COPY console_api /root/console_api WORKDIR /root/console_api -RUN pipenv install --system --deploy --ignore-pipfile +RUN pipenv install --deploy --ignore-pipfile WORKDIR /root #CMD pipenv run python manage.py runserver_plus 0.0.0.0:8000 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py index 87cf5aba0..ad7117ba2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py @@ -248,9 +248,10 @@ def metadata_group(ctx): @metadata_group.command(name="migrate") +@click.option("--detach", is_flag=True, help="Run metadata migration in detached mode") @click.pass_obj -def migrate_metadata_cmd(ctx): - exitcode, message = logic_metadata.migrate(ctx.env.metadata) +def migrate_metadata_cmd(ctx, detach): + exitcode, message = logic_metadata.migrate(ctx.env.metadata, detach) if exitcode != ExitCode.SUCCESS: raise click.ClickException(message) click.echo(message) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py index 3637446ae..8db2289e7 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py @@ -1,16 +1,19 @@ from typing import Tuple from console_link.models.metadata import Metadata -from console_link.models.utils import ExitCode +from console_link.models.utils import ExitCode, generate_log_file_path import logging logger = logging.getLogger(__name__) -def migrate(metadata: Metadata) -> Tuple[ExitCode, str]: +def migrate(metadata: Metadata, detached: bool) -> Tuple[ExitCode, str]: logger.info("Migrating metadata") + if detached: + log_file = generate_log_file_path("metadata_migration") + logger.info(f"Running in detached mode, writing logs to {log_file}") try: - result = metadata.migrate() + result = metadata.migrate(detached_log=log_file if detached else None) except Exception as e: logger.error(f"Failed to migrate metadata: {e}") return ExitCode.FAILURE, f"Failure when migrating metadata: {e}" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py index 85ca0e6cb..8c823b5d9 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py @@ -128,6 +128,7 @@ def execute_benchmark_workload(self, workload: str, elif self.auth_type == AuthMethod.SIGV4: raise NotImplementedError(f"Auth type {self.auth_type} is not currently support for executing " f"benchmark workloads") + # Note -- we should censor the password when logging this command logger.info(f"Running opensearch-benchmark with '{workload}' workload") subprocess.run(f"opensearch-benchmark execute-test --distribution-version=1.0.0 " f"--target-host={self.endpoint} --workload={workload} --pipeline=benchmark-only --test-mode " diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py index 2edeca3d1..3478e666f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -1,3 +1,4 @@ +import os import subprocess from typing import Optional from cerberus import Validator @@ -83,25 +84,13 @@ def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] # If `from_snapshot` is fully specified, use those values to define snapshot params if config["from_snapshot"] is not None: logger.debug("Using fully specified snapshot config") - self._snapshot_location = 's3' if 's3' in config["from_snapshot"] else 'fs' - self._snapshot_name = config["from_snapshot"]["snapshot_name"] - - if self._snapshot_location == 'fs': - self._repo_path = config["from_snapshot"]["fs"]["repo_path"] - else: - self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] - self._aws_region = config["from_snapshot"]["s3"]["aws_region"] + self._init_from_config() else: logger.debug("Using independently specified snapshot") - assert isinstance(snapshot, Snapshot) # This is assumable from logic, but specifying it for mypy - self._snapshot_name = snapshot.snapshot_name if isinstance(snapshot, S3Snapshot): - self._snapshot_location = "s3" - self._s3_uri = snapshot.s3_repo_uri - self._aws_region = snapshot.s3_region + self._init_from_s3_snapshot(snapshot) elif isinstance(snapshot, FileSystemSnapshot): - self._snapshot_location = "fs" - self._repo_path = snapshot.repo_path + self._init_from_fs_snapshot(snapshot) if config["from_snapshot"] is not None and "local_dir" in config["from_snapshot"]: self._local_dir = config["from_snapshot"]["local_dir"] @@ -117,7 +106,29 @@ def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] logger.info("Metadata migration configuration defined") - def migrate(self) -> CommandResult: + def _init_from_config(self) -> None: + config = self._config + self._snapshot_location = 's3' if 's3' in config["from_snapshot"] else 'fs' + self._snapshot_name = config["from_snapshot"]["snapshot_name"] + + if self._snapshot_location == 'fs': + self._repo_path = config["from_snapshot"]["fs"]["repo_path"] + else: + self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] + self._aws_region = config["from_snapshot"]["s3"]["aws_region"] + + def _init_from_s3_snapshot(self, snapshot: S3Snapshot) -> None: + self._snapshot_name = snapshot.snapshot_name + self._snapshot_location = "s3" + self._s3_uri = snapshot.s3_repo_uri + self._aws_region = snapshot.s3_region + + def _init_from_fs_snapshot(self, snapshot: FileSystemSnapshot) -> None: + self._snapshot_name = snapshot.snapshot_name + self._snapshot_location = "fs" + self._repo_path = snapshot.repo_path + + def migrate(self, detached_log=None) -> CommandResult: password_field_index = None command = [ "/root/metadataMigration/bin/MetadataMigration", @@ -164,13 +175,31 @@ def migrate(self) -> CommandResult: display_command = command[:password_field_index] + ["********"] + command[password_field_index:] else: display_command = command - logger.info(f"Creating snapshot with command: {' '.join(display_command)}") + logger.info(f"Migrating metadata with command: {' '.join(display_command)}") + if detached_log: + return self._run_as_detached_process(command, detached_log) + return self._run_as_synchronous_process(command) + + def _run_as_synchronous_process(self, command) -> CommandResult: try: # Pass None to stdout and stderr to not capture output and show in terminal - subprocess.Popen(command, stdout=None, stderr=None, text=True, start_new_session=True) - logger.info(f"Metadata migration for snapshot {self._snapshot_name} initiated") - return CommandResult(success=True, value="Metadata migration initiated") + subprocess.run(command, stdout=None, stderr=None, text=True) + logger.info(f"Metadata migration for snapshot {self._snapshot_name} completed") + return CommandResult(success=True, value="Metadata migration completed") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to migrate metadata: {str(e)}") + return CommandResult(success=False, value=f"Failed to migrate metadata: {str(e)}") + + def _run_as_detached_process(self, command, log_file) -> CommandResult: + try: + with open(log_file, "w") as f: + # Start the process in detached mode + process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, preexec_fn=os.setpgrp) + logger.info(f"Metadata migration process started with PID {process.pid}") + logger.info(f"Metadata migration logs available at {log_file}") + return CommandResult(success=True, value=f"Metadata migration started with PID {process.pid}\n" + f"Logs are being written to {log_file}") except subprocess.CalledProcessError as e: logger.error(f"Failed to create snapshot: {str(e)}") return CommandResult(success=False, value=f"Failed to migrate metadata: {str(e)}") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py index ea31fd3a1..4897e68d2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/utils.py @@ -1,6 +1,7 @@ # define a custom exception for aws api errors from enum import Enum from typing import Dict +from datetime import datetime class AWSAPIError(Exception): @@ -26,3 +27,8 @@ def raise_for_aws_api_error(response: Dict) -> None: class ExitCode(Enum): SUCCESS = 0 FAILURE = 1 + + +def generate_log_file_path(topic: str) -> str: + now = datetime.now().isoformat() + return f"{now}-{topic}.log" From 1945c298ddf8dbfb72a025097aaedfcefba9afd4 Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Tue, 25 Jun 2024 01:19:20 -0600 Subject: [PATCH 18/20] Get metadata in aws working Signed-off-by: Mikayla Thompson --- .../src/main/docker/migrationConsole/Dockerfile | 4 ++-- .../console_api/apps/orchestrator/views.py | 4 +++- .../loadServicesFromParameterStore.sh | 2 +- .../lib/migration-services-yaml.ts | 11 ++++++++++- .../lib/service-stacks/migration-console-stack.ts | 7 ++++--- 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index 09bae3f3b..f3614031b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -38,12 +38,12 @@ RUN chmod ug+x /root/loadServicesFromParameterStore.sh COPY lib /root/lib WORKDIR /root/lib/console_link -RUN pipenv install --deploy --ignore-pipfile +RUN pipenv install --system --deploy --ignore-pipfile # Experimental console API, not started by default COPY console_api /root/console_api WORKDIR /root/console_api -RUN pipenv install --deploy --ignore-pipfile +RUN pipenv install --system --deploy --ignore-pipfile WORKDIR /root #CMD pipenv run python manage.py runserver_plus 0.0.0.0:8000 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py index 07b8aaa08..4bbbd7c3e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py @@ -1,7 +1,6 @@ from console_api.apps.orchestrator.serializers import OpenSearchIngestionCreateRequestSerializer from console_link.models.osi_utils import (InvalidAuthParameters, create_pipeline_from_json, start_pipeline, stop_pipeline) -from console_link.models.migration import MigrationType from rest_framework.decorators import api_view, parser_classes from rest_framework.parsers import JSONParser from rest_framework.response import Response @@ -9,12 +8,15 @@ from pathlib import Path import boto3 import datetime +from enum import Enum import logging logger = logging.getLogger(__name__) PIPELINE_TEMPLATE_PATH = f"{Path(__file__).parents[4]}/osiPipelineTemplate.yaml" +MigrationType = Enum('MigrationType', ['OSI_HISTORICAL_MIGRATION']) + def pretty_request(request, data): headers = '' diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/loadServicesFromParameterStore.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/loadServicesFromParameterStore.sh index 2a6ecd58d..d77c61f47 100755 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/loadServicesFromParameterStore.sh +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/loadServicesFromParameterStore.sh @@ -8,7 +8,7 @@ if [ -z "$MIGRATION_SERVICES_YAML_PARAMETER" ]; then fi # Retrieve the parameter value from AWS Systems Manager Parameter Store -PARAMETER_VALUE=$(aws ssm get-parameters --names "$MIGRATION_SERVICES_YAML_PARAMETER" --query "Parameters[0].Value" --output text) +PARAMETER_VALUE=$(pipenv run aws ssm get-parameters --names "$MIGRATION_SERVICES_YAML_PARAMETER" --query "Parameters[0].Value" --output text) # Check if the retrieval was successful if [ $? -ne 0 ]; then diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts index 4d01c91e6..871fefc58 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts @@ -67,12 +67,20 @@ export class SnapshotYaml { } } +// This component can be much more complicated (specified snapshot details, index/component/template allowlists, etc.) +// but for the time being, we are assuming that the snapshot is the one specified in SnapshotYaml. +export class MetadataMigrationYaml { + from_snapshot: null = null; + min_replicas: number = 1; +} + export class ServicesYaml { source_cluster: ClusterYaml; target_cluster: ClusterYaml; metrics_source: MetricsSourceYaml = new MetricsSourceYaml(); backfill: RFSBackfillYaml | OSIBackfillYaml; snapshot?: SnapshotYaml; + metadata_migration?: MetadataMigrationYaml; stringify(): string { return yaml.stringify({ @@ -80,7 +88,8 @@ export class ServicesYaml { target_cluster: this.target_cluster, metrics_source: this.metrics_source, backfill: this.backfill?.toDict(), - snapshot: this.snapshot?.toDict() + snapshot: this.snapshot?.toDict(), + metadata_migration: this.metadata_migration }, { 'nullStr': '' diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index b05d6c7c2..252eb31ee 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -14,7 +14,7 @@ import { import {StreamingSourceType} from "../streaming-source-type"; import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs"; import {RemovalPolicy} from "aws-cdk-lib"; -import { ServicesYaml } from "../migration-services-yaml"; +import { MetadataMigrationYaml, ServicesYaml } from "../migration-services-yaml"; import { MigrationServiceCore } from "./migration-service-core"; export interface MigrationConsoleProps extends StackPropsExt { @@ -212,9 +212,9 @@ export class MigrationConsoleStack extends MigrationServiceCore { const artifactS3AnyObjectPath = `${artifactS3Arn}/*`; const artifactS3PublishPolicy = new PolicyStatement({ effect: Effect.ALLOW, - resources: [artifactS3AnyObjectPath], + resources: [artifactS3Arn, artifactS3AnyObjectPath], actions: [ - "s3:PutObject" + "s3:*" ] }) @@ -254,6 +254,7 @@ export class MigrationConsoleStack extends MigrationServiceCore { // TODO: We're not currently supporting auth here, this may need to be handled on the migration console 'no_auth': '' } + servicesYaml.metadata_migration = new MetadataMigrationYaml(); createMigrationStringParameter(this, servicesYaml.stringify(), { ...props, parameter: MigrationSSMParameter.SERVICES_YAML_FILE, From b7598382cfca3b3ba9de096c0e1da46ad46264de Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Tue, 25 Jun 2024 01:46:24 -0600 Subject: [PATCH 19/20] pre-review cleanup Signed-off-by: Mikayla Thompson --- .../java/com/rfs/common/FileSystemRepo.java | 4 --- .../src/main/docker/docker-compose.yml | 32 +------------------ .../lib/console_link/tests/test_metadata.py | 2 -- .../test/migration-console-stack.test.ts | 2 +- 4 files changed, 2 insertions(+), 38 deletions(-) diff --git a/RFS/src/main/java/com/rfs/common/FileSystemRepo.java b/RFS/src/main/java/com/rfs/common/FileSystemRepo.java index 80bfbec2e..54aaa0a3e 100644 --- a/RFS/src/main/java/com/rfs/common/FileSystemRepo.java +++ b/RFS/src/main/java/com/rfs/common/FileSystemRepo.java @@ -1,9 +1,5 @@ package com.rfs.common; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; - import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 72fe5eaeb..081132070 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -82,25 +82,12 @@ services: image: 'opensearchproject/opensearch:latest' environment: - discovery.type=single-node - - plugins.security.disabled=true - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! - - plugins.security.disabled=true networks: - migrations ports: - "29200:9200" - opensearchsource: - image: opensearchproject/opensearch - environment: - - discovery.type=single-node - - plugins.security.disabled=true - - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! - - ~/.aws:/root/.aws - networks: - - migrations - ports: - - "39200:9200" migration-console: image: 'migrations/migration_console:latest' @@ -110,28 +97,13 @@ services: - sharedReplayerOutput:/shared-replayer-output - ./migrationConsole/lib/console_link/services.yaml:/etc/migration_services.yaml # this is a convenience thing for testing -- it should be removed before this makes it to prod. - - ./migrationConsole/lib/console_link:/root/lib/console_link + # - ./migrationConsole/lib/console_link:/root/lib/console_link - ~/.aws:/root/.aws - - snapshot:/snapshot environment: - MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092 - AWS_PROFILE=default # command: ./runTestBenchmarks.sh - opensearchsource: - image: 'opensearchproject/opensearch:1' - volumes: - - snapshot:/snapshot - environment: - - discovery.type=single-node - - plugins.security.disabled=true - - path.repo=/snapshot - - OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123! - - ~/.aws:/root/.aws - networks: - - migrations - ports: - - "39200:9200" volumes: sharedComparatorSqlResults: @@ -140,8 +112,6 @@ volumes: driver: local grafana_data: driver: local - snapshot: - driver: local networks: migrations: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py index e0dd43971..7ee35cd50 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py @@ -1,5 +1,3 @@ - - import pytest from console_link.models.metadata import Metadata from console_link.models.snapshot import S3Snapshot diff --git a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts index 7c5fac46e..c72808449 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-console-stack.test.ts @@ -89,7 +89,7 @@ test('Test that services yaml parameter is created', () => { // Validates that the file can be parsed as valid yaml and has the expected fields const parsedFromYaml = yaml.parse(yamlFileContents); // Validates that the file has the expected fields - const expectedFields = ['source_cluster', 'target_cluster', 'metrics_source', 'backfill', 'snapshot']; + const expectedFields = ['source_cluster', 'target_cluster', 'metrics_source', 'backfill', 'snapshot', 'metadata_migration']; expect(Object.keys(parsedFromYaml).length).toEqual(expectedFields.length) expect(new Set(Object.keys(parsedFromYaml))).toEqual(new Set(expectedFields)) }) \ No newline at end of file From 53095aa19a4851cf0d93be802c2e435cf071d13d Mon Sep 17 00:00:00 2001 From: Mikayla Thompson Date: Tue, 25 Jun 2024 02:19:05 -0600 Subject: [PATCH 20/20] Add tests Signed-off-by: Mikayla Thompson --- .../console_link/models/metadata.py | 12 +- .../lib/console_link/tests/test_cli.py | 8 + .../lib/console_link/tests/test_metadata.py | 202 +++++++++++++++--- 3 files changed, 183 insertions(+), 39 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py index 3478e666f..3d07280a0 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py @@ -49,7 +49,7 @@ "min_replicas": {"type": "integer", "min": 0, "required": False}, "index_allowlist": list_schema(required=False), "index_template_allowlist": list_schema(required=False), - "component-template-allowlist": list_schema(required=False) + "component_template_allowlist": list_schema(required=False) } @@ -75,7 +75,7 @@ def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] self._min_replicas = config.get("min_replicas", 0) self._index_allowlist = config.get("index_allowlist", None) self._index_template_allowlist = config.get("index_template_allowlist", None) - self._component_template_allowlist = config.get("component-template-allowlist", None) + self._component_template_allowlist = config.get("component_template_allowlist", None) logger.debug(f"Min replicas: {self._min_replicas}") logger.debug(f"Index allowlist: {self._index_allowlist}") logger.debug(f"Index template allowlist: {self._index_template_allowlist}") @@ -163,13 +163,13 @@ def migrate(self, detached_log=None) -> CommandResult: command.append("--target-insecure") if self._index_allowlist: - command.extend(["--index-allowlist", ", ".join(self._index_allowlist)]) + command.extend(["--index-allowlist", ",".join(self._index_allowlist)]) if self._index_template_allowlist: - command.extend(["--index-template-allowlist", ", ".join(self._index_template_allowlist)]) + command.extend(["--index-template-allowlist", ",".join(self._index_template_allowlist)]) if self._component_template_allowlist: - command.extend(["--component-template-allowlist", ", ".join(self._component_template_allowlist)]) + command.extend(["--component-template-allowlist", ",".join(self._component_template_allowlist)]) if password_field_index: display_command = command[:password_field_index] + ["********"] + command[password_field_index:] @@ -184,7 +184,7 @@ def migrate(self, detached_log=None) -> CommandResult: def _run_as_synchronous_process(self, command) -> CommandResult: try: # Pass None to stdout and stderr to not capture output and show in terminal - subprocess.run(command, stdout=None, stderr=None, text=True) + subprocess.run(command, stdout=None, stderr=None, text=True, check=True) logger.info(f"Metadata migration for snapshot {self._snapshot_name} completed") return CommandResult(success=True, value="Metadata migration completed") except subprocess.CalledProcessError as e: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py index 81c24ce7b..d3f5ac9b8 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_cli.py @@ -168,3 +168,11 @@ def test_cli_cat_indices_e2e(runner, env): assert 'TARGET CLUSTER' in result.output assert source_cat_indices in result.output assert target_cat_indices in result.output + + +def test_cli_metadata_migrate(runner, env, mocker): + mock = mocker.patch("subprocess.run") + result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'metadata', 'migrate'], + catch_exceptions=True) + mock.assert_called_once() + assert result.exit_code == 0 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py index 7ee35cd50..003f385de 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_metadata.py @@ -1,9 +1,35 @@ import pytest +from console_link.models.cluster import AuthMethod from console_link.models.metadata import Metadata -from console_link.models.snapshot import S3Snapshot +from console_link.models.snapshot import FileSystemSnapshot, S3Snapshot from tests.utils import create_valid_cluster +@pytest.fixture() +def s3_snapshot(): + snapshot_config = { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + } + } + return S3Snapshot(snapshot_config, create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + +@pytest.fixture() +def fs_snapshot(): + snapshot_config = { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/repo" + } + } + return FileSystemSnapshot(snapshot_config, create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + def test_metadata_init_with_fully_specified_config_succeeds(): config = { "from_snapshot": { @@ -21,7 +47,7 @@ def test_metadata_init_with_fully_specified_config_succeeds(): "index_template_allowlist": [ "my_index_template", "my_second_index_template" ], - "component-template-allowlist": [ + "component_template_allowlist": [ "my_component_template", "my_second_component_template" ] } @@ -72,34 +98,18 @@ def test_metadata_init_with_partial_snapshot_config_no_external_snapshot_fails() assert 'required field' == excinfo.value.args[0]['from_snapshot'][0]['s3'][0]['repo_uri'][0] -def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(): - snapshot_config = { - "snapshot_name": "reindex_from_snapshot", - "s3": { - "repo_uri": "s3://my-bucket", - "aws_region": "us-east-1" - } - } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) +def test_metadata_init_with_minimal_config_and_external_snapshot_succeeds(s3_snapshot): config = { "from_snapshot": None, } - metadata = Metadata(config, create_valid_cluster(), snapshot) + metadata = Metadata(config, create_valid_cluster(), s3_snapshot) assert metadata._config == config - assert metadata._snapshot_name == snapshot_config["snapshot_name"] + assert metadata._snapshot_name == s3_snapshot.snapshot_name assert isinstance(metadata, Metadata) @pytest.mark.skip("Need to tighten up the schema specification to catch this case") -def test_metadata_init_with_partial_config_and_external_snapshot_fails(): - snapshot_config = { - "snapshot_name": "reindex_from_snapshot", - "s3": { - "repo_uri": "s3://my-bucket", - "aws_region": "us-east-1" - } - } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) +def test_metadata_init_with_partial_config_and_external_snapshot_fails(s3_snapshot): config = { "from_snapshot": { "local_dir": "/tmp/s3", @@ -107,21 +117,13 @@ def test_metadata_init_with_partial_config_and_external_snapshot_fails(): } } with pytest.raises(ValueError) as excinfo: - Metadata(config, create_valid_cluster(), snapshot) + Metadata(config, create_valid_cluster(), s3_snapshot) print(excinfo) assert 's3' in excinfo.value.args[0]['from_snapshot'][0] assert 'required field' == excinfo.value.args[0]['from_snapshot'][0]['s3'][0] -def test_full_config_and_snapshot_gives_priority_to_config(): - snapshot_config = { - "snapshot_name": "reindex_from_snapshot", - "s3": { - "repo_uri": "s3://my-bucket", - "aws_region": "us-east-1" - } - } - snapshot = S3Snapshot(snapshot_config, create_valid_cluster(), create_valid_cluster()) +def test_full_config_and_snapshot_gives_priority_to_config(s3_snapshot): config = { "from_snapshot": { "local_dir": "/tmp/s3", @@ -132,9 +134,143 @@ def test_full_config_and_snapshot_gives_priority_to_config(): }, } } - metadata = Metadata(config, create_valid_cluster(), snapshot) + metadata = Metadata(config, create_valid_cluster(), s3_snapshot) assert isinstance(metadata, Metadata) assert metadata._snapshot_name == config["from_snapshot"]["snapshot_name"] assert metadata._s3_uri == config["from_snapshot"]["s3"]["repo_uri"] assert metadata._aws_region == config["from_snapshot"]["s3"]["aws_region"] assert metadata._local_dir == config["from_snapshot"]["local_dir"] + + +def test_metadata_with_s3_snapshot_makes_correct_subprocess_call(mocker): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "local_dir": "/tmp/s3", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + target = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + metadata = Metadata(config, target, None) + + mock = mocker.patch("subprocess.run") + metadata.migrate() + + mock.assert_called_once_with([ + "/root/metadataMigration/bin/MetadataMigration", + "--snapshot-name", config["from_snapshot"]["snapshot_name"], + "--target-host", target.endpoint, + "--min-replicas", '0', + "--s3-local-dir", config["from_snapshot"]["local_dir"], + "--s3-repo-uri", config["from_snapshot"]["s3"]["repo_uri"], + "--s3-region", config["from_snapshot"]["s3"]["aws_region"], + '--target-insecure' + ], stdout=None, stderr=None, text=True, check=True + ) + + +def test_metadata_with_fs_snapshot_makes_correct_subprocess_call(mocker): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "path/to/repo" + }, + } + } + target = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + metadata = Metadata(config, target, None) + + mock = mocker.patch("subprocess.run") + metadata.migrate() + + mock.assert_called_once_with([ + "/root/metadataMigration/bin/MetadataMigration", + "--snapshot-name", config["from_snapshot"]["snapshot_name"], + "--target-host", target.endpoint, + "--min-replicas", '0', + "--file-system-repo-path", config["from_snapshot"]["fs"]["repo_path"], + '--target-insecure' + ], stdout=None, stderr=None, text=True, check=True + ) + + +def test_metadata_with_min_replicas_makes_correct_subprocess_call(mocker): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "path/to/repo" + }, + }, + "min_replicas": 2 + } + target = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + metadata = Metadata(config, target, None) + + mock = mocker.patch("subprocess.run") + metadata.migrate() + + mock.assert_called_once_with([ + "/root/metadataMigration/bin/MetadataMigration", + "--snapshot-name", config["from_snapshot"]["snapshot_name"], + "--target-host", target.endpoint, + "--min-replicas", '2', + "--file-system-repo-path", config["from_snapshot"]["fs"]["repo_path"], + '--target-insecure' + ], stdout=None, stderr=None, text=True, check=True + ) + + +def test_metadata_with_allowlists_makes_correct_subprocess_call(mocker): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "path/to/repo" + }, + }, + "index_allowlist": ["index1", "index2"], + "index_template_allowlist": ["index_template1", "index_template2"], + "component_template_allowlist": ["component_template1", "component_template2"] + } + target = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + metadata = Metadata(config, target, None) + + mock = mocker.patch("subprocess.run") + metadata.migrate() + + mock.assert_called_once_with([ + "/root/metadataMigration/bin/MetadataMigration", + "--snapshot-name", config["from_snapshot"]["snapshot_name"], + "--target-host", target.endpoint, + "--min-replicas", '0', + "--file-system-repo-path", config["from_snapshot"]["fs"]["repo_path"], + "--target-insecure", + "--index-allowlist", "index1,index2", + "--index-template-allowlist", "index_template1,index_template2", + "--component-template-allowlist", "component_template1,component_template2" + ], stdout=None, stderr=None, text=True, check=True + ) + + +def test_metadata_migrate_detached_makes_correct_subprocess_call(mocker): + config = { + "from_snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "path/to/repo" + }, + }, + "min_replicas": 2, + } + target = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + metadata = Metadata(config, target, None) + + mock = mocker.patch("subprocess.Popen") + metadata.migrate(detached_log="/tmp/log_file.log") + + mock.assert_called_once()