Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fetch Migration] Migrate index templates and component templates as a part of metadata migration #477

Merged
merged 4 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions FetchMigration/python/component_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#


# Constants
from typing import Optional

NAME_KEY = "name"
DEFAULT_TEMPLATE_KEY = "component_template"


# Class that encapsulates component template information
class ComponentTemplateInfo:
# Private member variables
__name: str
__template_def: Optional[dict]

def __init__(self, template_payload: dict, template_key: str = DEFAULT_TEMPLATE_KEY):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would/could there be a different template_key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexTemplateInfo is a subclass of this class and overrides the template_key value:

https://github.com/kartg/opensearch-migrations/blob/917f273917867a37a4b81a9b613e14e27887a94c/FetchMigration/python/index_template_info.py#L23

This was done because the structure of the index-template and component-template responses are largely identical except for the difference in template key string.

self.__name = template_payload[NAME_KEY]
self.__template_def = None
if template_key in template_payload:
self.__template_def = template_payload[template_key]

def get_name(self) -> str:
return self.__name

def get_template_definition(self) -> dict:
return self.__template_def
61 changes: 61 additions & 0 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
import jsonpath_ng
import requests

Comment on lines 13 to 14
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(on the filename, not this line). Index is both a noun and a verb, so it isn't clear to me whether this applies to operations on indices (nouns) or if this is a set of operations of a particular type (verb). index_management.py, or something that could shake the ambiguity would probably help new contributors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point taken. Note that I'd like to do this in a follow-up PR to keep the diff in this one focused on the template migration changes

from component_template_info import ComponentTemplateInfo
from endpoint_info import EndpointInfo
from index_doc_count import IndexDocCount
from index_template_info import IndexTemplateInfo

# Constants
SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
ALIASES_KEY = "aliases"
COUNT_KEY = "count"
__INDEX_KEY = "index"
__COMPONENT_TEMPLATE_LIST_KEY = "component_templates"
__INDEX_TEMPLATE_LIST_KEY = "index_templates"
__INDEX_TEMPLATES_PATH = "/_index_template"
__COMPONENT_TEMPLATES_PATH = "/_component_template"
__ALL_INDICES_ENDPOINT = "*"
# (ES 7+) size=0 avoids the "hits" payload to reduce the response size since we're only interested in the aggregation,
# and track_total_hits forces an accurate doc-count
Expand Down Expand Up @@ -106,3 +112,58 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
return IndexDocCount(total, count_map)
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch doc_count: {e!s}")


def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) -> set:
url: str = endpoint.add_path(path)
# raises RuntimeError in case of any request errors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you scope this to something more specific to disambiguate errors up the callstack? Leaving them as ConnectionError, HTTPError, etc or preserving the exception (rather than stringifying) would be preferable. SonarQube has some rules to not use RuntimeExceptions for Java & I the principles apply for other languages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there's an exception? If looks like it propagates up? It might be fair that this code doesn't handle it, but if there are compound instructions with dependencies, it will be harder to guarantee success without a more granular retry strategy. Saying that everything executes in < 5s and that these calls have never been observed to fail are probably valid reasons NOT to invest time in making them granular though.

Copy link
Member Author

@kartg kartg Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment. I just learned that Python3 supports exception chaining so i'll incorporate that here instead of stringifying the underlying exception. Note that i'll continue to use RuntimeError since it's the only built-in exception type that's applicable here. I agree that exception typing in Fetch could be more specific - if you believe that's valuable, I can pick this up as a follow-up refactoring/improvement task.

What happens if there's an exception? If looks like it propagates up?

That's correct. This was an intentional decision to fail the metadata migration if templates could not be fetched (and therefore migrated)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception chaining added with this commit

try:
resp = __send_get_request(url, endpoint)
result = set()
if root_key in resp.json():
for template in resp.json()[root_key]:
result.add(factory(template))
return result
except RuntimeError as e:
# Chain the underlying exception as a cause
raise RuntimeError("Failed to fetch template metadata from cluster endpoint") from e


def fetch_all_component_templates(endpoint: EndpointInfo) -> set[ComponentTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __COMPONENT_TEMPLATES_PATH, __COMPONENT_TEMPLATE_LIST_KEY,
lambda t: ComponentTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch component template metadata") from e


def fetch_all_index_templates(endpoint: EndpointInfo) -> set[IndexTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __INDEX_TEMPLATES_PATH, __INDEX_TEMPLATE_LIST_KEY,
lambda t: IndexTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch index template metadata") from e


def __create_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo, template_path: str) -> dict:
failures = dict()
for template in templates:
template_endpoint = endpoint.add_path(template_path + "/" + template.get_name())
try:
resp = requests.put(template_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=template.get_template_definition(), timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
failures[template.get_name()] = e
# Loop completed, return failures if any
return failures


def create_component_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __COMPONENT_TEMPLATES_PATH)


def create_index_templates(templates: set[IndexTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __INDEX_TEMPLATES_PATH)
23 changes: 23 additions & 0 deletions FetchMigration/python/index_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

from component_template_info import ComponentTemplateInfo

# Constants
INDEX_TEMPLATE_KEY = "index_template"


# Class that encapsulates index template information from a cluster.
# Subclass of ComponentTemplateInfo because the structure of an index
# template is identical to a component template, except that it uses
# a different template key. Also, index templates can be "composed" of
# one or more component templates.
class IndexTemplateInfo(ComponentTemplateInfo):
def __init__(self, template_payload: dict):
super().__init__(template_payload, INDEX_TEMPLATE_KEY)
115 changes: 78 additions & 37 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import endpoint_utils
import index_operations
import utils
from endpoint_info import EndpointInfo
from index_diff import IndexDiff
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult
Expand Down Expand Up @@ -50,60 +51,100 @@ def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover
logging.info("Target document count: " + str(total_doc_count))


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
# Parse and validate pipelines YAML file
with open(args.config_file_path, 'r') as pipeline_file:
dp_config = yaml.safe_load(pipeline_file)
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
# Raises a ValueError if source or sink definitions are missing
endpoint_utils.validate_pipeline(pipeline_config)
source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SOURCE_KEY)
target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SINK_KEY)
def index_metadata_migration(source: EndpointInfo, target: EndpointInfo,
args: MetadataMigrationParams) -> MetadataMigrationResult:
result = MetadataMigrationResult()
# Fetch indices
source_indices = index_operations.fetch_all_indices(source_endpoint_info)
source_indices = index_operations.fetch_all_indices(source)
# If source indices is empty, return immediately
if len(source_indices.keys()) == 0:
return result
target_indices = index_operations.fetch_all_indices(target_endpoint_info)
# Compute index differences and print report
target_indices = index_operations.fetch_all_indices(target)
# Compute index differences and create result object
diff = IndexDiff(source_indices, target_indices)
if diff.identical_indices:
# Identical indices with zero documents on the target are eligible for migration
target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info)
target_doc_count = index_operations.doc_count(diff.identical_indices, target)
# doc_count only returns indices that have non-zero counts, so the difference in responses
# gives us the set of identical, empty indices
result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys())
diff.set_identical_empty_indices(result.migration_indices)
if diff.indices_to_create:
result.migration_indices.update(diff.indices_to_create)
if result.migration_indices:
doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info)
doc_count_result = index_operations.doc_count(result.migration_indices, source)
result.target_doc_count = doc_count_result.total
# Print report
if args.report:
print_report(diff, result.target_doc_count)
if result.migration_indices:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
failed_indices = index_operations.create_indices(index_data, target_endpoint_info)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
# Create index metadata on target
if result.migration_indices and not args.dryrun:
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
failed_indices = index_operations.create_indices(index_data, target)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
return result


# Returns true if there were failures, false otherwise
def __log_template_failures(failures: dict, target_count: int) -> bool:
fail_count = len(failures)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {target_count} templates")
for failed_template_name, error in failures.items():
logging.error(f"Template name {failed_template_name} failed: {error!s}")
# Return true to signal failures
return True
else:
# No failures, return false
return False


# Raises RuntimeError if component/index template migration fails
def template_migration(source: EndpointInfo, target: EndpointInfo):
# Fetch and migrate component templates first
templates = index_operations.fetch_all_component_templates(source)
failures = index_operations.create_component_templates(templates, target)
if not __log_template_failures(failures, len(templates)):
# Only migrate index templates if component template migration had no failures
templates = index_operations.fetch_all_index_templates(source)
failures = index_operations.create_index_templates(templates, target)
if __log_template_failures(failures, len(templates)):
raise RuntimeError("Failed to create some index templates")
else:
raise RuntimeError("Failed to create some component templates, aborting index template creation")


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
# Parse and validate pipelines YAML file
with open(args.config_file_path, 'r') as pipeline_file:
dp_config = yaml.safe_load(pipeline_file)
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
# Raises a ValueError if source or sink definitions are missing
endpoint_utils.validate_pipeline(pipeline_config)
source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SOURCE_KEY)
target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SINK_KEY)
result = index_metadata_migration(source_endpoint_info, target_endpoint_info, args)
# Write output YAML
if result.migration_indices and len(args.output_file) > 0:
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
# Create component and index templates, may raise RuntimeError
template_migration(source_endpoint_info, target_endpoint_info)
# Finally return result
return result


Expand Down Expand Up @@ -135,6 +176,6 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
arg_parser.add_argument("--report", "-r", action="store_true",
help="Print a report of the index differences")
arg_parser.add_argument("--dryrun", action="store_true",
help="Skips the actual creation of indices on the target cluster")
help="Skips the actual creation of metadata on the target cluster")
namespace = arg_parser.parse_args()
run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
Loading
Loading