Skip to content

Commit

Permalink
Merge branch 'main' into console-lib-rfs-status
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed Jun 26, 2024
2 parents a88e07c + 3fec905 commit b1a89de
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def main():
)
throughput_message = f"Request throughput over the last 5 seconds: {throughput:.2f} req/sec"

clear_output_message = "\033c" if not args.no_clear_output else ""
clear_output_message = "\033[H\033[J" if not args.no_clear_output else ""

logger.info(f"{clear_output_message}" +
f"{request_message}\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def add_delay_messages(

if len(target_timestamp_diffs) >= 2:
speedup_factor = calculate_average_speedup_factor(target_timestamp_diffs)
log_messages.append(f"Speedup Factor (last 5 seconds): {speedup_factor:.0%}")
log_messages.append(f"Speedup Ratio (last 5 seconds): {speedup_factor:.0%}")
else:
log_messages.append("Insufficient data points to calculate Speedup Factor")

Expand All @@ -162,17 +162,17 @@ def add_delay_messages(
)

log_messages.append(
f"Rolling average of source delay over last 5 seconds: {source_rolling_average:.3f}"
f"Rolling average of source delay over last 5 seconds: {source_rolling_average:.1f}"
if source_rolling_average is not None
else "Rolling average of source delay over last 5 seconds: N/A"
)
log_messages.append(
f"Rolling average of target delay over last 5 seconds: {target_rolling_average:.3f}"
f"Rolling average of target delay over last 5 seconds: {target_rolling_average:.1f}"
if target_rolling_average is not None
else "Rolling average of target delay over last 5 seconds: N/A"
)
log_messages.append(
f"Difference in rolling averages over last 5 seconds: {rolling_average_diff:.3f}"
f"Difference in rolling averages over last 5 seconds: {rolling_average_diff:.1f}"
if rolling_average_diff is not None
else "Difference in rolling averages over last 5 seconds: N/A"
)
Expand Down Expand Up @@ -221,7 +221,7 @@ def main_loop():
target_latest_document, current_time
)

clear_message = "\033c" if not args.no_clear_output else ""
clear_message = "\033[H\033[J" if not args.no_clear_output else ""

log_messages.append(
f"Source latest timestamp: {source_latest_timestamp if source_latest_timestamp else 'N/A'}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ def run_test_benchmarks(cluster: Cluster):

# As a default we exclude system indices and searchguard indices
def clear_indices(cluster: Cluster):
clear_indices_path = "/*,-.*,-searchguard*,-sg7*"
r = cluster.call_api(clear_indices_path, method=HttpMethod.DELETE)
clear_indices_path = "/*,-.*,-searchguard*,-sg7*,.migrations_working_state"
r = cluster.call_api(clear_indices_path, method=HttpMethod.DELETE, params={"ignore_unavailable": "true"})
return r.content
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def __init__(self, config: Dict) -> None:
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, json_body=None) -> requests.Response:
def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None,
json_body=None, **kwargs) -> requests.Response:
"""
Calls an API on the cluster.
"""
Expand All @@ -101,16 +102,19 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None, json
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

if json_body is not None:
# headers = {'Content-type': 'application/json'}
data = json_body
else:
# headers = None
data = None

logger.info(f"Making api call to {self.endpoint}{path}")

# Extract query parameters from kwargs
params = kwargs.get('params', {})

r = requests.request(
method.name,
f"{self.endpoint}{path}",
params=params,
verify=(not self.allow_insecure),
auth=auth,
timeout=timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def get_snapshot_status_full(cluster: Cluster, snapshot: str,
if not snapshots:
return CommandResult(success=False, value="Snapshot status not available")

message = get_snapshot_status_message(snapshot_info)
message = get_snapshot_status_message(snapshots[0])
return CommandResult(success=True, value=f"{state}\n{message}")
except Exception as e:
return CommandResult(success=False, value=f"Failed to get full snapshot status: {str(e)}")
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from console_link.models.snapshot import S3Snapshot, FileSystemSnapshot, Snapshot
from console_link.environment import get_snapshot
from console_link.models.cluster import AuthMethod, Cluster, HttpMethod
from console_link.logic import snapshot as snapshot_logic
from tests.utils import create_valid_cluster
import pytest
import unittest.mock as mock
Expand Down Expand Up @@ -51,41 +52,52 @@ def test_s3_snapshot_status_full(s3_snapshot, mock_cluster):
mock_response.json.return_value = {
"snapshots": [
{
"snapshot": "test_snapshot",
"snapshot": "rfs-snapshot",
"repository": "migration_assistant_repo",
"uuid": "7JFrWqraSJ20anKfiSIj1Q",
"state": "SUCCESS",
"include_global_state": True,
"shards_stats": {
"total": 10,
"done": 10,
"failed": 0
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 304,
"failed": 0,
"total": 304
},
"stats": {
"total": {
"size_in_bytes": 1000000
"incremental": {
"file_count": 67041,
"size_in_bytes": 440990672819
},
"processed": {
"size_in_bytes": 1000000
"total": {
"file_count": 67041,
"size_in_bytes": 440990672819
},
"start_time_in_millis": 1625097600000,
"time_in_millis": 60000
"start_time_in_millis": 1719343996753,
"time_in_millis": 79426
}
}
]
}
mock_cluster.call_api.return_value = mock_response

result = s3_snapshot.status(deep_check=True)
result = snapshot_logic.status(snapshot=s3_snapshot, deep_check=True)

assert isinstance(result, CommandResult)
assert result.success
assert "SUCCESS" in result.value
assert "Percent completed: 100.00%" in result.value
assert "Total shards: 10" in result.value
assert "Successful shards: 10" in result.value
assert "Total shards: 304" in result.value
assert "Successful shards: 304" in result.value
assert "Failed shards: 0" in result.value
assert "Start time:" in result.value
assert "Duration:" in result.value
assert "Anticipated duration remaining:" in result.value
assert "Throughput:" in result.value

assert "N/A" not in result.value

mock_cluster.call_api.assert_called_with("/_snapshot/migration_assistant_repo/test_snapshot/_status",
HttpMethod.GET)

Expand Down
5 changes: 2 additions & 3 deletions test/awsRunIntegTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ source_endpoint="http://${source_lb_endpoint}:19200"
proxy_endpoint="http://${source_lb_endpoint}:9200"
target_endpoint=$(aws ssm get-parameter --name "/migration/${STAGE}/default/osClusterEndpoint" --query 'Parameter.Value' --output text)
echo "Clearing non-system source indices"
unbuffer aws ecs execute-command --cluster "migration-${STAGE}-ecs-cluster" --task "${task_arn}" --container "migration-console" --interactive --command "curl -XDELETE ${source_endpoint}/*,-.*,-searchguard*,-sg7*"
unbuffer aws ecs execute-command --cluster "migration-${STAGE}-ecs-cluster" --task "${task_arn}" --container "migration-console" --interactive --command "curl -XDELETE '${source_endpoint}/*,-.*,-searchguard*,-sg7*?ignore_unavailable=true'"
echo "Clearing non-system target indices"
unbuffer aws ecs execute-command --cluster "migration-${STAGE}-ecs-cluster" --task "${task_arn}" --container "migration-console" --interactive --command "curl -XDELETE ${target_endpoint}/*,-.*"

unbuffer aws ecs execute-command --cluster "migration-${STAGE}-ecs-cluster" --task "${task_arn}" --container "migration-console" --interactive --command "curl -XDELETE '${target_endpoint}/*,-.*,.migrations_working_state?ignore_unavailable=true'"
echo "Print initial source and target indices after clearing indices: "
unbuffer aws ecs execute-command --cluster "migration-${STAGE}-ecs-cluster" --task "${task_arn}" --container "migration-console" --interactive --command "./catIndices.sh --source-endpoint ${source_endpoint} --source-no-auth --target-no-auth"

Expand Down

0 comments on commit b1a89de

Please sign in to comment.