Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add console_link library to migration console, implement cat-indices #657

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
pipeline {
environment {
// GIT_URL = 'https://github.com/mikaylathompson/opensearch-migrations.git'
GIT_URL = 'https://github.com/opensearch-project/opensearch-migrations.git'
GIT_BRANCH = 'main'
STAGE = 'aws-integ'
}

agent any

parameters {
booleanParam(name: 'USE_LOCAL_WORKSPACE', defaultValue: false, description: 'Use local workspace for the build')
string(name: 'BRANCH_NAME', defaultValue: 'main', description: 'Branch to build from')
}

stages {
stage('Checkout') {
agent any
steps {
script {
if (params.USE_LOCAL_WORKSPACE) {
sh "/copyGitTrackedFiles.sh /opensearch-migrations-src ."
} else {
git branch: "${params.BRANCH_NAME}", url: "${env.GIT_URL}"
}
}
}
}

stage('Test Caller Identity') {
agent any
steps {
sh 'aws sts get-caller-identity'
}
}

stage('Build') {
agent any
steps {
timeout(time: 1, unit: 'HOURS') {
dir('TrafficCapture') {
sh './gradlew build -x test'
}
}
}
}

stage('Deploy') {
steps {
dir('test') {
sh 'sudo usermod -aG docker $USER'
sh 'sudo newgrp docker'
sh "sudo ./awsE2ESolutionSetup.sh --stage ${env.STAGE} --migrations-git-url ${env.GIT_URL} --migrations-git-branch ${env.GIT_BRANCH}"
}
}
}

stage('Integ Tests') {
steps {
dir('test') {
script {
def time = new Date().getTime()
def uniqueId = "integ_min_${time}_${currentBuild.number}"
sh "sudo ./awsRunIntegTests.sh --stage ${env.STAGE} --migrations-git-url ${env.GIT_URL} --migrations-git-branch ${env.GIT_BRANCH} --unique-id ${uniqueId}"
}
}

}
}
}
// post {
// always {
// dir('test') {
// sh "sudo ./awsE2ESolutionSetup.sh --stage ${env.STAGE} --run-post-actions"
// }
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ services:
- migrations
volumes:
- sharedReplayerOutput:/shared-replayer-output
- ./migrationConsole/console_link/services.yaml:/etc/migration_services.yaml
- ./migrationConsole/console_link:/root/console_link
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed since image already has the library?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not needed, but it means that I can make code changes locally and instantly test it on the docker container. What do you think about making a note that this is a temporary thing, but keeping it in for now? This seems like the sort of thing that's okay on the e2e branch, but I want a reminder to take it out before we merge to main

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha, yeah I'm fine with a comment for now

environment:
- MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092
# command: ./runTestBenchmarks.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ RUN chmod ug+x /root/showFetchMigrationCommand.sh
RUN chmod ug+x /root/osiMigration.py
RUN chmod ug+x /root/kafka-tools/kafkaExport.sh

COPY console_link /root/console_link
RUN pip install -e /root/console_link/

CMD tail -f /dev/null
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import click
import console_link.logic as logic
from console_link.logic.instantiation import Environment


class Context(object):
def __init__(self, config_file) -> None:
self.config_file = config_file
self.env = Environment(config_file)


@click.group()
@click.option('--config-file', default='/etc/migration_services.yaml', help='Path to config file')
@click.pass_context
def cli(ctx, config_file):
ctx.obj = Context(config_file)


@cli.command(name="cat-indices")
@click.pass_obj
def cat_indices_cmd(ctx):
"""Simple program that calls `_cat/indices` on both a source and target cluster."""
click.echo("SOURCE CLUSTER")
click.echo(logic.clusters.cat_indices(ctx.env.source_cluster))
click.echo("TARGET CLUSTER")
click.echo(logic.clusters.cat_indices(ctx.env.target_cluster))
pass


@cli.command(name="start-replayer")
@click.pass_obj
def start_replayer_cmd(ctx):
logic.services.start_replayer(ctx.env.replayer)


if __name__ == '__main__':
cli()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import console_link.logic.clusters
import console_link.logic.instantiation
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from console_link.models.cluster import Cluster


def cat_indices(cluster: Cluster, as_json=False):
as_json_suffix = "?format=json" if as_json else ""
cat_indices_path = f"/_cat/indices{as_json_suffix}"
r = cluster.call_api(cat_indices_path)
return r.json() if as_json else r.content
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from console_link.models.cluster import Cluster
import yaml
from cerberus import Validator

SCHEMA = {
'source_cluster': {
'type': 'dict',
'required': True
},
'target_cluster': {
'type': 'dict',
'required': True
},
'replayer': {
'type': 'dict',
'required': False
},
'backfill': {
'type': 'dict',
'required': False
}
}


class Environment:
def __init__(self, config_file: str):
# TODO: add validation of overall yaml structure here, and details in each component.

self.config_file = config_file
with open(self.config_file) as f:
self.config = yaml.safe_load(f)
v = Validator(SCHEMA)
if not v.validate(self.config):
raise ValueError(f"Invalid config file: {v.errors}")

self.source_cluster = Cluster(self.config['source_cluster'])

# At some point, target and replayers should be stored as pairs, but for the time being
# we can probably assume one target cluster.
self.target_cluster = Cluster(self.config['target_cluster'])
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Dict, Optional
from enum import Enum
import requests
from requests.auth import HTTPBasicAuth
from cerberus import Validator

requests.packages.urllib3.disable_warnings()

AuthMethod = Enum("AuthMethod", ["BASIC", "SIGV4"])
HttpMethod = Enum("HttpMethod", ["GET", "POST", "PUT", "DELETE"])

SCHEMA = {
'endpoint': {
'type': 'string',
'required': True
},
'allow_insecure': {
'type': 'boolean',
'required': False
},
'authorization': {
'type': 'dict',
'required': False,
'schema': {
'type': {
'type': 'string',
'required': True,
'allowed': [e.name.lower() for e in AuthMethod]
},
'details': {
'type': 'dict'
}
}
}
}


class Cluster():
"""
An elasticcsearch or opensearch cluster.
"""
endpoint: str = ""
auth_type: Optional[AuthMethod] = None
auth_details: Optional[Dict] = None

def __init__(self, config: Dict) -> None:
v = Validator(SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for cluster", v.errors)

self.endpoint = config["endpoint"]
if self.endpoint.startswith("https"):
self.allow_insecure = config.get("allow_insecure", False)
self.auth_type = AuthMethod[config["authorization"]["type"].upper()]
self.auth_details = config["authorization"]["details"]
pass

def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> Dict:
"""
Calls an API on the cluster.
"""
if self.auth_type == AuthMethod.BASIC:
auth = HTTPBasicAuth(self.auth_details["username"], self.auth_details["password"])
elif self.auth_type is None:
auth = None
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

r = requests.request(method.name, f"{self.endpoint}{path}", verify=(not self.allow_insecure), auth=auth)
r.raise_for_status()
return r
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from enum import Enum
from typing import Dict
import boto3

DeploymentType = Enum('DeploymentType', ['DOCKER', 'ECS'])

SCHEMA = {
'deployment_type': {
'type': 'string',
'required': True
}
}


class BaseReplayer():
"""
The Replayer base class
"""
def __init__(self, config: Dict) -> None:
self.config = config
v = Validator(SCHEMA)
if not v.validate(config):
raise ValueError(f"Invalid config file for cluster: {v.errors}")

def start_replayer(self):
"""
Starts the replayer.
"""
raise NotImplementedError

def stop_replayer(self):
"""
Stops the replayer.
"""
raise NotImplementedError


class LocalReplayer(BaseReplayer):
def start_replayer(self):
pass

def stop_replayer(self):
pass


class ECSReplayer(BaseReplayer):
client = boto3.client('ecs')

def __init__(self, config: Dict) -> None:
super().__init__(config)
self.cluster_name = config['cluster_name']
self.service_name = config['service_name']

def start_replayer(self) -> None:
self.client.update_service(
cluster=self.cluster_name,
service=self.service_name,
desiredCount=1
)

def stop_replayer(self) -> None:
self.client.update_service(
cluster=self.cluster_name,
service=self.service_name,
desiredCount=0
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
source_cluster:
endpoint: "https://capture-proxy-es:9200"
allow_insecure: true
authorization:
type: "basic"
details:
username: "admin"
password: "admin"
target_cluster:
endpoint: "https://opensearchtarget:9200"
allow_insecure: true
authorization:
type: "basic"
details:
username: "admin"
password: "myStrongPassword123!"
replayer:
deployment_type: "docker"
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from setuptools import setup, find_packages

setup(
name='console_link',
version='1.0.0',
description='A Python module to create a console application from a Python script',
packages=find_packages(exclude=('tests')),
install_requires=[
'requests',
'boto3',
'pyyaml',
'Click',
'cerberus'
],
entry_points={
'console_scripts': [
'console = console_link.cli:cli',
],
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Topic :: Software Development :: Build Tools',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: MIT License',
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest
Loading
Loading