Skip to content

Commit

Permalink
Merge branch 'main' into FixDaemonThreadsFromPreventingDeath2
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn authored Jun 25, 2024
2 parents c4d682f + f6da3bc commit 91c64bd
Show file tree
Hide file tree
Showing 15 changed files with 432 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import console_link.logic.backfill as logic_backfill
import console_link.logic.snapshot as logic_snapshot
import console_link.logic.metadata as logic_metadata
import console_link.logic.replay as logic_replay

from console_link.models.utils import ExitCode
from console_link.environment import Environment
Expand Down Expand Up @@ -237,7 +238,61 @@ def status_backfill_cmd(ctx):
raise click.ClickException(message)
click.echo(message)

# ##################### METRICS ###################

# ##################### REPLAY ###################

@cli.group(name="replay")
@click.pass_obj
def replay_group(ctx):
"""All actions related to replaying data"""
if ctx.env.replay is None:
raise click.UsageError("Replay is not set")


@replay_group.command(name="describe")
@click.pass_obj
def describe_replay_cmd(ctx):
click.echo(logic_replay.describe(ctx.env.replay, as_json=ctx.json))


@replay_group.command(name="start")
@click.pass_obj
def start_replay_cmd(ctx):
exitcode, message = logic_replay.start(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


@replay_group.command(name="stop")
@click.pass_obj
def stop_replay_cmd(ctx):
exitcode, message = logic_replay.stop(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


@replay_group.command(name="scale")
@click.argument("units", type=int, required=True)
@click.pass_obj
def scale_replay_cmd(ctx, units: int):
exitcode, message = logic_replay.scale(ctx.env.replay, units)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


@replay_group.command(name="status")
@click.pass_obj
def status_replay_cmd(ctx):
exitcode, message = logic_replay.status(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)


# ##################### METADATA ###################


@cli.group(name="metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from console_link.logic.backfill import get_backfill
from console_link.models.backfill_base import Backfill
from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot
from console_link.models.replayer_base import Replayer
from console_link.models.replayer_ecs import ECSReplayer
import yaml
from cerberus import Validator

Expand All @@ -20,14 +22,21 @@ def get_snapshot(config: Dict, source_cluster: Cluster):
return S3Snapshot(config, source_cluster)


def get_replayer(config: Dict):
if 'ecs' in config:
return ECSReplayer(config)
raise ValueError("Invalid replayer config")


SCHEMA = {
"source_cluster": {"type": "dict", "required": False},
"target_cluster": {"type": "dict", "required": True},
"replayer": {"type": "dict", "required": False},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
"metadata_migration": {"type": "dict", "required": False}
"metadata_migration": {"type": "dict", "required": False},
"replay": {"type": "dict", "required": False}
}


Expand All @@ -38,6 +47,7 @@ class Environment:
metrics_source: Optional[MetricsSource] = None
snapshot: Optional[Snapshot] = None
metadata: Optional[Metadata] = None
replay: Optional[Replayer] = None

def __init__(self, config_file: str):
logger.info(f"Loading config file: {config_file}")
Expand Down Expand Up @@ -77,6 +87,10 @@ def __init__(self, config_file: str):
else:
logger.info("No backfill provided")

if 'replay' in self.config:
self.replay: Replayer = get_replayer(self.config["replay"])
logger.info(f"Replay initialized: {self.replay}")

if 'snapshot' in self.config:
self.snapshot: Snapshot = get_snapshot(self.config["snapshot"],
source_cluster=self.source_cluster)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import logging
from typing import Tuple
from console_link.models.utils import ExitCode
from console_link.models.replayer_base import Replayer
import yaml


logger = logging.getLogger(__name__)


def describe(replayer: Replayer, as_json=False) -> str:
response = replayer.describe()
if as_json:
return json.dumps(response)
return yaml.safe_dump(response)


def start(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
try:
result = replayer.start(*args, **kwargs)
except NotImplementedError:
logger.error(f"Start is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Start is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to start replayer: {e}")
return ExitCode.FAILURE, f"Failure when starting replayer: {type(e).__name__} {e}"

if result.success:
return ExitCode.SUCCESS, "Replayer started successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer start failed." + "\n" + result.display()


def stop(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Stopping replayer")
try:
result = replayer.stop(*args, **kwargs)
except NotImplementedError:
logger.error(f"Stop is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Stop is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to stop replayer: {e}")
return ExitCode.FAILURE, f"Failure when stopping replayer: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Replayer stopped successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer stop failed." + "\n" + result.display()


def scale(replayer: Replayer, units: int, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info(f"Scaling replayer to {units} units")
try:
result = replayer.scale(units, *args, **kwargs)
except NotImplementedError:
logger.error(f"Scale is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Scale is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to scale replayer: {e}")
return ExitCode.FAILURE, f"Failure when scaling replayer: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Replayer scaled successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer scale failed." + "\n" + result.display()


def status(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting replayer status")
try:
status = replayer.get_status(*args, **kwargs)
except NotImplementedError:
logger.error(f"Status is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for replayer: {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to get status of replayer: {e}")
return ExitCode.FAILURE, f"Failure when getting status of replayer: {type(e).__name__} {e}"
if status:
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Replayer status retrieval failed." + "\n" + status
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ def start(self, *args, **kwargs) -> CommandResult:
def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from enum import Enum
from typing import Dict
from abc import ABC, abstractmethod

from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult

from cerberus import Validator

DOCKER_REPLAY_SCHEMA = {
"type": "dict",
"schema": {
"socket": {"type": "string", "required": False}
}
}

ECS_REPLAY_SCHEMA = {
"type": "dict",
"schema": {
"cluster_name": {"type": "string", "required": True},
"service_name": {"type": "string", "required": True},
"aws_region": {"type": "string", "required": False}
}
}

SCHEMA = {
"replay": {
"type": "dict",
"schema": {
"docker": DOCKER_REPLAY_SCHEMA,
"ecs": ECS_REPLAY_SCHEMA,
"scale": {"type": "integer", "required": False, "min": 1}
},
"check_with": contains_one_of({"docker", "ecs"})
}
}


ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"])


class Replayer(ABC):
"""
Interface for replaying data from kafka to a target cluster.
"""
def __init__(self, config: Dict) -> None:
v = Validator(SCHEMA)
self.config = config
if not v.validate({"replay": self.config}):
raise ValueError("Invalid config file for replay", v.errors)
self.default_scale = self.config.get("scale", 1)

@abstractmethod
def start(self, *args, **kwargs) -> CommandResult:
"""Begin running the replayer. After running start, the user should be able to assume that--barring exceptions
or failures--their data will begin playing against the target cluster."""
pass

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult:
"""Stop or pause the replay. This does not make guarantees about resumeability."""
pass

@abstractmethod
def get_status(self, *args, **kwargs) -> ReplayStatus:
"""Return a status"""
pass

@abstractmethod
def scale(self, units: int, *args, **kwargs) -> CommandResult:
pass

def describe(self) -> Dict:
return self.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService
from console_link.models.replayer_base import Replayer, ReplayStatus

import logging

logger = logging.getLogger(__name__)


class ECSReplayer(Replayer):
def __init__(self, config: Dict) -> None:
super().__init__(config)
self.ecs_config = self.config["ecs"]
self.ecs_client = ECSService(self.ecs_config["cluster_name"], self.ecs_config["service_name"],
self.ecs_config.get("aws_region", None))

def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting ECS replayer by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping ECS replayer by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def get_status(self, *args, **kwargs) -> ReplayStatus:
raise NotImplementedError()

def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling ECS replayer by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)
Loading

0 comments on commit 91c64bd

Please sign in to comment.