Skip to content

Commit

Permalink
[Core] Fix status refresh for INIT cluster (#2491)
Browse files Browse the repository at this point in the history
* Fix ports for existing cluster

* refresh ports

* gracefully handle the head_ssh_port to be None

* changed to runtime error

* address comments
  • Loading branch information
Michaelvll authored Aug 31, 2023
1 parent eede371 commit 11298dc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
18 changes: 15 additions & 3 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2012,17 +2012,23 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
# triggered.
if external_ips is None or len(external_ips) == 0:
logger.debug(f'Refreshing status ({cluster_name!r}): No cached '
f'IPs found. External IPs: {external_ips}')
f'IPs found. Handle: {handle}')
raise exceptions.FetchIPError(
reason=exceptions.FetchIPError.Reason.HEAD)

# Potentially refresh the external SSH ports, in case the existing
# cluster before #2491 was launched without external SSH ports
# cached.
external_ssh_ports = handle.external_ssh_ports()
head_ssh_port = external_ssh_ports[0]

# Check if ray cluster status is healthy.
ssh_credentials = ssh_credential_from_yaml(handle.cluster_yaml,
handle.docker_user)
assert handle.head_ssh_port is not None, handle

runner = command_runner.SSHCommandRunner(external_ips[0],
**ssh_credentials,
port=handle.head_ssh_port)
port=head_ssh_port)
rc, output, stderr = runner.run(
RAY_STATUS_WITH_SKY_RAY_PORT_COMMAND,
stream_logs=False,
Expand All @@ -2047,6 +2053,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
logger.debug(str(e))
except Exception as e: # pylint: disable=broad-except
# This can be raised by `external_ssh_ports()`, due to the
# underlying call to kubernetes API.
logger.debug(
f'Refreshing status ({cluster_name!r}) failed: '
f'{common_utils.format_exception(e, use_bracket=True)}')
return False

# Determining if the cluster is healthy (UP):
Expand Down
17 changes: 12 additions & 5 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,9 +1525,10 @@ def _retry_zones(
if zones and len(zones) == 1:
launched_resources = launched_resources.copy(zone=zones[0].name)

prev_cluster_ips = None
prev_cluster_ips, prev_ssh_ports = None, None
if prev_handle is not None:
prev_cluster_ips = prev_handle.stable_internal_external_ips
prev_ssh_ports = prev_handle.stable_ssh_ports
# Record early, so if anything goes wrong, 'sky status' will show
# the cluster name and users can appropriately 'sky down'. It also
# means a second 'sky launch -c <name>' will attempt to reuse.
Expand All @@ -1544,10 +1545,11 @@ def _retry_zones(
launched_resources=launched_resources,
tpu_create_script=config_dict.get('tpu-create-script'),
tpu_delete_script=config_dict.get('tpu-delete-script'),
# Use the previous cluster's IPs if available to optimize
# the case where the cluster is restarted, i.e., no need to
# query the IPs from the cloud provider.
stable_internal_external_ips=prev_cluster_ips)
# Use the previous cluster's IPs and ports if available to
# optimize the case where the cluster is restarted, i.e., no
# need to query IPs and ports from the cloud provider.
stable_internal_external_ips=prev_cluster_ips,
stable_ssh_ports=prev_ssh_ports)
usage_lib.messages.usage.update_final_cluster_status(
status_lib.ClusterStatus.INIT)

Expand Down Expand Up @@ -2315,6 +2317,7 @@ def _update_cluster_region(self):
self.launched_resources = self.launched_resources.copy(region=region)

def update_ssh_ports(self, max_attempts: int = 1) -> None:
"""Updates the cluster SSH ports cached in the handle."""
# TODO(romilb): Replace this with a call to the cloud class to get ports
# Use port 22 for everything except Kubernetes
if not isinstance(self.launched_resources.cloud, clouds.Kubernetes):
Expand Down Expand Up @@ -2362,6 +2365,10 @@ def update_cluster_ips(
external_ips: The external IPs to use for the cluster. Similar to
internal_ips, it is an optimization to avoid retrieving the
external IPs from the cloud provider.
Raises:
exceptions.FetchIPError: if we failed to get the IPs. e.reason is
HEAD or WORKER.
"""

def is_provided_ips_valid(ips: Optional[List[Optional[str]]]) -> bool:
Expand Down

0 comments on commit 11298dc

Please sign in to comment.