Skip to content

Commit

Permalink
Merge branch 'skypilot-org:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
asaiacai authored Oct 4, 2024
2 parents 4373180 + 12706e9 commit e9f3112
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 21 deletions.
16 changes: 12 additions & 4 deletions sky/jobs/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@

logger = sky_logging.init_logger(__name__)

_DB_PATH = pathlib.Path('~/.sky/spot_jobs.db')
_DB_PATH = _DB_PATH.expanduser().absolute()
_DB_PATH.parents[0].mkdir(parents=True, exist_ok=True)
_DB_PATH = str(_DB_PATH)

def _get_db_path() -> str:
"""Workaround to collapse multi-step Path ops for type checker.
Ensures _DB_PATH is str, avoiding Union[Path, str] inference.
"""
path = pathlib.Path('~/.sky/spot_jobs.db')
path = path.expanduser().absolute()
path.parents[0].mkdir(parents=True, exist_ok=True)
return str(path)


_DB_PATH = _get_db_path()

# Module-level connection/cursor; thread-safe as the module is only imported
# once.
Expand Down
4 changes: 4 additions & 0 deletions sky/serve/autoscalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def _load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
raise NotImplementedError

def get_decision_interval(self) -> int:
"""Get the decision interval for the autoscaler."""
raise NotImplementedError

def load_dynamic_states(self, dynamic_states: Dict[str, Any]) -> None:
"""Load dynamic states to autoscaler."""
self.latest_version_ever_ready = dynamic_states.pop(
Expand Down
2 changes: 1 addition & 1 deletion sky/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def configure_logger():
logger.info('SkyServe Controller started on '
f'http://{self._host}:{self._port}')

uvicorn.run(self._app, host={self._host}, port=self._port)
uvicorn.run(self._app, host=self._host, port=self._port)


# TODO(tian): Probably we should support service that will stop the VM in
Expand Down
2 changes: 1 addition & 1 deletion sky/serve/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def _sync_with_controller(self):
'request_aggregator':
self._request_aggregator.to_dict()
},
timeout=5,
timeout=aiohttp.ClientTimeout(5),
) as response:
# Clean up after reporting request info to avoid OOM.
self._request_aggregator.clear()
Expand Down
4 changes: 3 additions & 1 deletion sky/serve/replica_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
from sky import resources
from sky.serve import service_spec

logger = sky_logging.init_logger(__name__)
Expand Down Expand Up @@ -172,9 +173,10 @@ def _get_resources_ports(task_yaml: str) -> str:
task = sky.Task.from_yaml(task_yaml)
# Already checked all ports are the same in sky.serve.core.up
assert len(task.resources) >= 1, task
task_resources = list(task.resources)[0]
task_resources: 'resources.Resources' = list(task.resources)[0]
# Already checked the resources have and only have one port
# before upload the task yaml.
assert task_resources.ports is not None
return task_resources.ports[0]


Expand Down
16 changes: 12 additions & 4 deletions sky/serve/serve_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
from sky.serve import replica_managers
from sky.serve import service_spec

_DB_PATH = pathlib.Path(constants.SKYSERVE_METADATA_DIR) / 'services.db'
_DB_PATH = _DB_PATH.expanduser().absolute()
_DB_PATH.parents[0].mkdir(parents=True, exist_ok=True)
_DB_PATH = str(_DB_PATH)

def _get_db_path() -> str:
"""Workaround to collapse multi-step Path ops for type checker.
Ensures _DB_PATH is str, avoiding Union[Path, str] inference.
"""
path = pathlib.Path(constants.SKYSERVE_METADATA_DIR) / 'services.db'
path = path.expanduser().absolute()
path.parents[0].mkdir(parents=True, exist_ok=True)
return str(path)


_DB_PATH: str = _get_db_path()


def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None:
Expand Down
22 changes: 12 additions & 10 deletions sky/serve/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import shutil
import time
import traceback
from typing import Dict, List
from typing import Dict

import filelock

Expand Down Expand Up @@ -116,15 +116,17 @@ def _cleanup(service_name: str) -> bool:
logger.error(f'Replica {info.replica_id} failed to terminate.')
versions = serve_state.get_service_versions(service_name)
serve_state.remove_service_versions(service_name)
success = True
for version in versions:

def cleanup_version_storage(version: int) -> bool:
task_yaml: str = serve_utils.generate_task_yaml_file_name(
service_name, version)
logger.info(f'Cleaning up storage for version {version}, '
f'task_yaml: {task_yaml}')
success = success and cleanup_storage(task_yaml)
if not success:
return cleanup_storage(task_yaml)

if not all(map(cleanup_version_storage, versions)):
failed = True

return failed


Expand Down Expand Up @@ -213,6 +215,7 @@ def _get_host():

# TODO(tian): Support HTTPS.
controller_addr = f'http://{controller_host}:{controller_port}'

load_balancer_port = common_utils.find_free_port(
constants.LOAD_BALANCER_PORT_START)

Expand All @@ -236,13 +239,12 @@ def _get_host():
serve_state.set_service_status_and_active_versions(
service_name, serve_state.ServiceStatus.SHUTTING_DOWN)
finally:
process_to_kill: List[multiprocessing.Process] = []
if load_balancer_process is not None:
process_to_kill.append(load_balancer_process)
if controller_process is not None:
process_to_kill.append(controller_process)
# Kill load balancer process first since it will raise errors if failed
# to connect to the controller. Then the controller process.
process_to_kill = [
proc for proc in [load_balancer_process, controller_process]
if proc is not None
]
subprocess_utils.kill_children_processes(
[process.pid for process in process_to_kill], force=True)
for process in process_to_kill:
Expand Down

0 comments on commit e9f3112

Please sign in to comment.