Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Console] Status for RFS Backfill #771

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,18 @@


@backfill_group.command(name="status")
@click.option('--deep-check', is_flag=True, help='Perform a deep status check of the backfill')
@click.pass_obj
def status_backfill_cmd(ctx):
exitcode, message = logic_backfill.status(ctx.env.backfill)
def status_backfill_cmd(ctx, deep_check):
logger.info(f"Called `console backfill status`, with {deep_check=}")
exitcode, message = logic_backfill.status(ctx.env.backfill, deep_check=deep_check)

Check warning on line 238 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L237-L238

Added lines #L237 - L238 were not covered by tests
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)
if len(message) == 2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For consistency, can we just have the message be a single string with '\n'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, this was ugly, I like that better.

click.echo(message[0])
click.echo(message[1])

Check warning on line 243 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L241-L243

Added lines #L241 - L243 were not covered by tests
else:
click.echo(message)

Check warning on line 245 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L245

Added line #L245 was not covered by tests


# ##################### REPLAY ###################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@
return ExitCode.FAILURE, "Backfill scale failed." + "\n" + result.display()


def status(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting backfill status")
def status(backfill: Backfill, deep_check, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info(f"Getting backfill status with {deep_check=}")

Check warning on line 123 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py#L123

Added line #L123 was not covered by tests
try:
status = backfill.get_status(*args, **kwargs)
status = backfill.get_status(deep_check, *args, **kwargs)

Check warning on line 125 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py#L125

Added line #L125 was not covered by tests
except NotImplementedError:
logger.error(f"Status is not implemented for backfill {type(backfill).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for backfill: {type(backfill).__name__}"
except Exception as e:
logger.error(f"Failed to get status of backfill: {e}")
return ExitCode.FAILURE, f"Failure when getting status of backfill: {type(e).__name__} {e}"
if status:
if status.success:

Check warning on line 132 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py#L132

Added line #L132 was not covered by tests
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + status
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + status.value

Check warning on line 134 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/backfill.py#L134

Added line #L134 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
}


BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"])
BackfillStatus = Enum("BackfillStatus", ["NOT_STARTED", "STARTING", "RUNNING", "STOPPED", "FAILED"])


class Backfill(ABC):
Expand Down Expand Up @@ -50,7 +50,7 @@ def stop(self, *args, **kwargs) -> CommandResult:
pass

@abstractmethod
def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
"""Return a status"""
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from console_link.models.osi_utils import (create_pipeline_from_env, start_pipeline, stop_pipeline,
OpenSearchIngestionMigrationProps)
from console_link.models.cluster import Cluster
from console_link.models.backfill_base import Backfill, BackfillStatus
from console_link.models.backfill_base import Backfill
from console_link.models.command_result import CommandResult
from typing import Dict
from cerberus import Validator
Expand Down Expand Up @@ -92,7 +92,7 @@ def stop(self, pipeline_name=None):
pipeline_name = self.osi_props.pipeline_name
stop_pipeline(osi_client=self.osi_client, pipeline_name=pipeline_name)

def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def scale(self, units: int, *args, **kwargs) -> CommandResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
def stop(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def get_status(self, *args, **kwargs) -> BackfillStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
raise NotImplementedError()

def scale(self, units: int, *args, **kwargs) -> CommandResult:
Expand Down Expand Up @@ -96,3 +96,40 @@
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def get_status(self, deep_check, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
instance_statuses = self.ecs_client.get_instance_statuses()
if not instance_statuses:
return CommandResult(False, "Failed to get instance statuses")

Check warning on line 104 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py#L101-L104

Added lines #L101 - L104 were not covered by tests

status_string = str(instance_statuses)
if deep_check:
try:
shard_status = self._get_detailed_status()
except Exception as e:
logger.error(f"Failed to get detailed status: {e}")
if shard_status:
status_string += f"\n{shard_status}"

Check warning on line 113 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py#L106-L113

Added lines #L106 - L113 were not covered by tests

if instance_statuses.running > 0:
return CommandResult(True, (BackfillStatus.RUNNING, status_string))
elif instance_statuses.pending > 0:
return CommandResult(True, (BackfillStatus.STARTING, status_string))
return CommandResult(True, (BackfillStatus.STOPPED, status_string))

Check warning on line 119 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py#L115-L119

Added lines #L115 - L119 were not covered by tests

def _get_detailed_status(self):
status_query = {"query": {

Check warning on line 122 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py#L122

Added line #L122 was not covered by tests
"bool": {
"must": [{"exists": {"field": "expiration"}}],
"must_not": [{"exists": {"field": "completedAt"}}]
}
}}
response = self.target_cluster.call_api("/.migrations_working_state/_search", json_body=status_query)
r_body = response.json()
logger.debug(f"Raw response: {r_body}")
if "hits" in r_body:
logger.info(f"Hits on detailed status query: {r_body['hits']}")
logger.info(f"Sample of remaining shards: {[hit['_id'] for hit in r_body['hits']['hits']]}")
return f"Remaining shards: {r_body['hits']['total']['value']}"
return response.json()

Check warning on line 135 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/backfill_rfs.py#L128-L135

Added lines #L128 - L135 were not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None) -> requests.Response:
def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, json_body=None) -> requests.Response:
"""
Calls an API on the cluster.
"""
Expand All @@ -100,13 +100,21 @@
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

if json_body is not None:
# headers = {'Content-type': 'application/json'}
data = json_body

Check warning on line 105 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/cluster.py#L105

Added line #L105 was not covered by tests
else:
# headers = None
data = None

logger.info(f"Making api call to {self.endpoint}{path}")
r = requests.request(
method.name,
f"{self.endpoint}{path}",
verify=(not self.allow_insecure),
auth=auth,
timeout=timeout
timeout=timeout,
json=data
)
logger.debug(f"Cluster API call request: {r.request}")
r.raise_for_status()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import NamedTuple, Optional

import boto3

Expand All @@ -9,6 +10,15 @@
logger = logging.getLogger(__name__)


class InstanceStatues(NamedTuple):
running: int = 0
pending: int = 0
desired: int = 0

def __str__(self):
return f"Running={self.running}\nPending={self.pending}\nDesired={self.desired}"

Check warning on line 19 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py#L19

Added line #L19 was not covered by tests


class ECSService:
def __init__(self, cluster_name, service_name, aws_region=None):
self.cluster_name = cluster_name
Expand All @@ -25,7 +35,6 @@
service=self.service_name,
desiredCount=desired_count
)
logger.debug(f"Response from update_service: {response}")

try:
raise_for_aws_api_error(response)
Expand All @@ -38,3 +47,22 @@
desired_count = response["service"]["desiredCount"]
return CommandResult(True, f"Service {self.service_name} set to {desired_count} desired count."
f" Currently {running_count} running and {pending_count} pending.")

def get_instance_statuses(self) -> Optional[InstanceStatues]:
logger.info(f"Getting instance statuses for service {self.service_name}")
response = self.client.describe_services(

Check warning on line 53 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py#L52-L53

Added lines #L52 - L53 were not covered by tests
cluster=self.cluster_name,
services=[self.service_name]
)
try:
raise_for_aws_api_error(response)
except AWSAPIError as e:
logger.error(f"Error getting instance statuses: {e}")
return None

Check warning on line 61 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py#L57-L61

Added lines #L57 - L61 were not covered by tests

service = response["services"][0]
return InstanceStatues(

Check warning on line 64 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/ecs_service.py#L63-L64

Added lines #L63 - L64 were not covered by tests
running=service["runningCount"],
pending=service["pendingCount"],
desired=service["desiredCount"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ def create(self, *args, **kwargs) -> CommandResult:
logger.error(f"Failed to create snapshot: {str(e)}")
return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}")

def status(self, *args, **kwargs) -> CommandResult:
deep_check = kwargs.get('deep_check', False)
def status(self, *args, deep_check=False, **kwargs) -> CommandResult:
if deep_check:
return get_snapshot_status_full(self.source_cluster, self.snapshot_name)
return get_snapshot_status(self.source_cluster, self.snapshot_name)
Expand Down
Loading