-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Console] Metadata Migration #756
Changes from all commits
b5559fe
5b12d5b
ce00934
40bb291
77c52e3
8b59c29
81b7d16
519b845
cc82c49
6a6e7a8
5ef5302
b376543
a75c5f6
2f2ca1b
293ab14
6364600
862e15c
13dcf5a
d56d1b7
c790c52
f1558b6
1945c29
b759838
53095aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,22 @@ | ||
from console_api.apps.orchestrator.serializers import OpenSearchIngestionCreateRequestSerializer | ||
from console_link.models.osi_utils import (InvalidAuthParameters, create_pipeline_from_json, start_pipeline, | ||
stop_pipeline) | ||
from console_link.models.migration import MigrationType | ||
from rest_framework.decorators import api_view, parser_classes | ||
from rest_framework.parsers import JSONParser | ||
from rest_framework.response import Response | ||
from rest_framework import status | ||
from pathlib import Path | ||
import boto3 | ||
import datetime | ||
from enum import Enum | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
PIPELINE_TEMPLATE_PATH = f"{Path(__file__).parents[4]}/osiPipelineTemplate.yaml" | ||
|
||
MigrationType = Enum('MigrationType', ['OSI_HISTORICAL_MIGRATION']) | ||
|
||
Comment on lines
+18
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lewijacn -- neither the file it's being imported from nor the enum referenced exist anymore. I added it locally for the time being, because I don't think this is potentially changeable data at this point. Post-demo, we should 1/ add tests to the api so we don't accidentally break it again, 2/ figure out what this should be calling. |
||
|
||
def pretty_request(request, data): | ||
headers = '' | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from typing import Tuple | ||
|
||
from console_link.models.metadata import Metadata | ||
from console_link.models.utils import ExitCode, generate_log_file_path | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def migrate(metadata: Metadata, detached: bool) -> Tuple[ExitCode, str]: | ||
logger.info("Migrating metadata") | ||
if detached: | ||
log_file = generate_log_file_path("metadata_migration") | ||
logger.info(f"Running in detached mode, writing logs to {log_file}") | ||
Check warning on line 14 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py#L13-L14
|
||
try: | ||
result = metadata.migrate(detached_log=log_file if detached else None) | ||
except Exception as e: | ||
logger.error(f"Failed to migrate metadata: {e}") | ||
return ExitCode.FAILURE, f"Failure when migrating metadata: {e}" | ||
Check warning on line 19 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py#L17-L19
|
||
if result.success: | ||
return ExitCode.SUCCESS, result.value | ||
return ExitCode.FAILURE, result.value | ||
Check warning on line 22 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/metadata.py#L22
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
import os | ||
import subprocess | ||
from typing import Optional | ||
from cerberus import Validator | ||
import tempfile | ||
import logging | ||
|
||
from console_link.models.command_result import CommandResult | ||
from console_link.models.schema_tools import list_schema | ||
from console_link.models.cluster import AuthMethod, Cluster | ||
from console_link.models.snapshot import S3Snapshot, Snapshot, FileSystemSnapshot | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
FROM_SNAPSHOT_SCHEMA = { | ||
"type": "dict", | ||
# In the future, there should be a "from_snapshot" and a "from_live_cluster" option, but for now only snapshot is | ||
# supported, so this is required. It _can_ be null, but that requires a snapshot to defined on its own in | ||
# the services.yaml file. | ||
"required": True, | ||
"nullable": True, | ||
"schema": { | ||
"snapshot_name": {"type": "string", "required": True}, | ||
"local_dir": {"type": "string", "required": False}, | ||
"s3": { | ||
'type': 'dict', | ||
"required": False, | ||
'schema': { | ||
'repo_uri': {'type': 'string', 'required': True}, | ||
'aws_region': {'type': 'string', 'required': True}, | ||
} | ||
}, | ||
"fs": { | ||
'type': 'dict', | ||
"required": False, | ||
'schema': { | ||
'repo_path': {'type': 'string', 'required': True}, | ||
} | ||
} | ||
}, | ||
# We _should_ have the check below, but I need to figure out how to combine it with a potentially | ||
# nullable block (like this one) | ||
# 'check_with': contains_one_of({'s3', 'fs'}) | ||
|
||
} | ||
|
||
SCHEMA = { | ||
"from_snapshot": FROM_SNAPSHOT_SCHEMA, | ||
"min_replicas": {"type": "integer", "min": 0, "required": False}, | ||
"index_allowlist": list_schema(required=False), | ||
"index_template_allowlist": list_schema(required=False), | ||
"component_template_allowlist": list_schema(required=False) | ||
} | ||
|
||
|
||
def generate_tmp_dir(name: str) -> str: | ||
return tempfile.mkdtemp(prefix=f"migration-{name}-") | ||
|
||
|
||
class Metadata: | ||
def __init__(self, config, target_cluster: Cluster, snapshot: Optional[Snapshot] = None): | ||
logger.debug(f"Initializing Metadata with config: {config}") | ||
v = Validator(SCHEMA) | ||
if not v.validate(config): | ||
logger.error(f"Invalid config: {v.errors}") | ||
raise ValueError(v.errors) | ||
self._config = config | ||
self._target_cluster = target_cluster | ||
self._snapshot = snapshot | ||
|
||
if (not snapshot) and (config["from_snapshot"] is None): | ||
raise ValueError("No snapshot is specified or can be assumed " | ||
"for the metadata migration to use.") | ||
|
||
self._min_replicas = config.get("min_replicas", 0) | ||
self._index_allowlist = config.get("index_allowlist", None) | ||
self._index_template_allowlist = config.get("index_template_allowlist", None) | ||
self._component_template_allowlist = config.get("component_template_allowlist", None) | ||
logger.debug(f"Min replicas: {self._min_replicas}") | ||
logger.debug(f"Index allowlist: {self._index_allowlist}") | ||
logger.debug(f"Index template allowlist: {self._index_template_allowlist}") | ||
logger.debug(f"Component template allowlist: {self._component_template_allowlist}") | ||
|
||
# If `from_snapshot` is fully specified, use those values to define snapshot params | ||
if config["from_snapshot"] is not None: | ||
logger.debug("Using fully specified snapshot config") | ||
self._init_from_config() | ||
else: | ||
logger.debug("Using independently specified snapshot") | ||
if isinstance(snapshot, S3Snapshot): | ||
self._init_from_s3_snapshot(snapshot) | ||
elif isinstance(snapshot, FileSystemSnapshot): | ||
self._init_from_fs_snapshot(snapshot) | ||
Check warning on line 93 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L92-L93
|
||
|
||
if config["from_snapshot"] is not None and "local_dir" in config["from_snapshot"]: | ||
self._local_dir = config["from_snapshot"]["local_dir"] | ||
else: | ||
self._local_dir = generate_tmp_dir(self._snapshot_name) | ||
|
||
logger.debug(f"Snapshot name: {self._snapshot_name}") | ||
if self._snapshot_location == 's3': | ||
logger.debug(f"S3 URI: {self._s3_uri}") | ||
logger.debug(f"AWS region: {self._aws_region}") | ||
else: | ||
logger.debug(f"Local dir: {self._local_dir}") | ||
|
||
logger.info("Metadata migration configuration defined") | ||
|
||
def _init_from_config(self) -> None: | ||
config = self._config | ||
self._snapshot_location = 's3' if 's3' in config["from_snapshot"] else 'fs' | ||
self._snapshot_name = config["from_snapshot"]["snapshot_name"] | ||
|
||
if self._snapshot_location == 'fs': | ||
self._repo_path = config["from_snapshot"]["fs"]["repo_path"] | ||
else: | ||
self._s3_uri = config["from_snapshot"]["s3"]["repo_uri"] | ||
self._aws_region = config["from_snapshot"]["s3"]["aws_region"] | ||
|
||
def _init_from_s3_snapshot(self, snapshot: S3Snapshot) -> None: | ||
self._snapshot_name = snapshot.snapshot_name | ||
self._snapshot_location = "s3" | ||
self._s3_uri = snapshot.s3_repo_uri | ||
self._aws_region = snapshot.s3_region | ||
|
||
def _init_from_fs_snapshot(self, snapshot: FileSystemSnapshot) -> None: | ||
self._snapshot_name = snapshot.snapshot_name | ||
self._snapshot_location = "fs" | ||
self._repo_path = snapshot.repo_path | ||
Check warning on line 129 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L127-L129
|
||
|
||
def migrate(self, detached_log=None) -> CommandResult: | ||
password_field_index = None | ||
command = [ | ||
"/root/metadataMigration/bin/MetadataMigration", | ||
# Initially populate only the required params | ||
"--snapshot-name", self._snapshot_name, | ||
"--target-host", self._target_cluster.endpoint, | ||
"--min-replicas", str(self._min_replicas) | ||
] | ||
if self._snapshot_location == 's3': | ||
command.extend([ | ||
"--s3-local-dir", self._local_dir, | ||
"--s3-repo-uri", self._s3_uri, | ||
"--s3-region", self._aws_region, | ||
]) | ||
elif self._snapshot_location == 'fs': | ||
command.extend([ | ||
"--file-system-repo-path", self._repo_path, | ||
]) | ||
|
||
if self._target_cluster.auth_details == AuthMethod.BASIC_AUTH: | ||
try: | ||
command.extend([ | ||
Check warning on line 153 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L152-L153
|
||
"--target-username", self._target_cluster.auth_details.get("username"), | ||
"--target-password", self._target_cluster.auth_details.get("password") | ||
]) | ||
password_field_index = len(command) - 1 | ||
logger.info("Using basic auth for target cluster") | ||
except KeyError as e: | ||
raise ValueError(f"Missing required auth details for target cluster: {e}") | ||
Check warning on line 160 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L157-L160
|
||
|
||
if self._target_cluster.allow_insecure: | ||
command.append("--target-insecure") | ||
|
||
if self._index_allowlist: | ||
command.extend(["--index-allowlist", ",".join(self._index_allowlist)]) | ||
|
||
if self._index_template_allowlist: | ||
command.extend(["--index-template-allowlist", ",".join(self._index_template_allowlist)]) | ||
|
||
if self._component_template_allowlist: | ||
command.extend(["--component-template-allowlist", ",".join(self._component_template_allowlist)]) | ||
|
||
if password_field_index: | ||
display_command = command[:password_field_index] + ["********"] + command[password_field_index:] | ||
Check warning on line 175 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L175
|
||
else: | ||
display_command = command | ||
logger.info(f"Migrating metadata with command: {' '.join(display_command)}") | ||
|
||
if detached_log: | ||
return self._run_as_detached_process(command, detached_log) | ||
return self._run_as_synchronous_process(command) | ||
|
||
def _run_as_synchronous_process(self, command) -> CommandResult: | ||
try: | ||
# Pass None to stdout and stderr to not capture output and show in terminal | ||
subprocess.run(command, stdout=None, stderr=None, text=True, check=True) | ||
logger.info(f"Metadata migration for snapshot {self._snapshot_name} completed") | ||
return CommandResult(success=True, value="Metadata migration completed") | ||
except subprocess.CalledProcessError as e: | ||
logger.error(f"Failed to migrate metadata: {str(e)}") | ||
return CommandResult(success=False, value=f"Failed to migrate metadata: {str(e)}") | ||
Check warning on line 192 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L190-L192
|
||
|
||
def _run_as_detached_process(self, command, log_file) -> CommandResult: | ||
try: | ||
with open(log_file, "w") as f: | ||
# Start the process in detached mode | ||
process = subprocess.Popen(command, stdout=f, stderr=subprocess.STDOUT, preexec_fn=os.setpgrp) | ||
logger.info(f"Metadata migration process started with PID {process.pid}") | ||
logger.info(f"Metadata migration logs available at {log_file}") | ||
return CommandResult(success=True, value=f"Metadata migration started with PID {process.pid}\n" | ||
f"Logs are being written to {log_file}") | ||
except subprocess.CalledProcessError as e: | ||
logger.error(f"Failed to create snapshot: {str(e)}") | ||
return CommandResult(success=False, value=f"Failed to migrate metadata: {str(e)}") | ||
Check warning on line 205 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py Codecov / codecov/patchTrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/metadata.py#L203-L205
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is included because I was getting this error while deploying to aws. It seems odd that I would run into this and no one else would, so it's possible there's another explanation.
cc: @peternied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you have this commit when you got that error? #753