Skip to content

Commit

Permalink
combine 3 sets to 1 dict
Browse files Browse the repository at this point in the history
  • Loading branch information
andylizf committed Oct 26, 2024
1 parent 6b3b4c8 commit 934bde3
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions sky/jobs/controller.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Controller: handles the life cycle of a managed job."""
import argparse
from concurrent import futures
import enum
import multiprocessing
import os
import pathlib
import queue
import time
import traceback
import typing
from typing import Optional, Set, Tuple
from typing import Dict, Tuple

import filelock

Expand Down Expand Up @@ -45,6 +46,12 @@ def _get_dag_and_name(dag_yaml: str) -> Tuple['sky.Dag', str]:
return dag, dag_name


class TaskStatus(enum.Enum):
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'


class JobsController:
"""Each jobs controller manages the life cycle of one managed job."""

Expand All @@ -59,9 +66,7 @@ def __init__(self, job_id: int, dag_yaml: str,

self._dag_graph = self._dag.get_graph()
self._task_queue = self._initialize_task_queue()
self._completed_tasks: Set[int] = set()
self._failed_tasks: Set[int] = set()
self._block_tasks: Set[int] = set()
self._task_status: Dict['sky.Task', TaskStatus] = {}

# Add a unique identifier to the task environment variables, so that
# the user can have the same id for multiple recoveries.
Expand Down Expand Up @@ -341,9 +346,9 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool:

def _try_add_successors_to_queue(self, task_id: int):
is_task_runnable = lambda task: (all(
self._dag.tasks.index(pred) in self._completed_tasks
for pred in self._dag_graph.predecessors(task)) and task_id not in
self._block_tasks)
self._task_status.get(pred) == TaskStatus.COMPLETED
for pred in self._dag_graph.predecessors(task)
) and self._task_status.get(task) != TaskStatus.CANCELLED)
task = self._dag.tasks[task_id]
for successor in self._dag_graph.successors(task):
successor_id = self._dag.tasks.index(successor)
Expand Down Expand Up @@ -385,29 +390,33 @@ def _handle_future_completion(self, future: futures.Future, task_id: int):
task_id, managed_job_state.ManagedJobStatus.FAILED_CONTROLLER,
msg)
finally:
task = self._dag.tasks[task_id]
if succeeded:
logger.info(
f'Task {task_id} completed with result: {succeeded}')
self._completed_tasks.add(task_id)
self._task_status[task] = TaskStatus.COMPLETED
self._try_add_successors_to_queue(task_id)
else:
logger.info(f'Adding task {task_id} to failed tasks.')
self._failed_tasks.add(task_id)
self._task_status[task] = TaskStatus.FAILED
self._cancel_all_tasks(task_id)

def _cancel_all_tasks(self, task_id: int):
callback_func = managed_job_utils.event_callback_func(
job_id=self._job_id, task_id=task_id, task=self._dag.tasks[task_id])
for task in self._dag.tasks:
if task not in self._task_status:
self._task_status.setdefault(task, TaskStatus.CANCELLED)

# Call set_cancelling before set_cancelled to make sure the table
# entries are correctly set.
managed_job_state.set_cancelling(self._job_id, callback_func)
managed_job_state.set_cancelled(self._job_id, callback_func)

def run(self):
"""Run controller logic and handle exceptions."""
all_tasks_completed = lambda: len(self._dag.tasks) == (len(
self._completed_tasks) + len(self._failed_tasks) + len(
self._block_tasks))
all_tasks_completed = lambda: len(self._dag.tasks) == len(self.
_task_status)
# TODO(andy):Serve has a logic to prevent from too many services running
# at the same time. We should have a similar logic here, but instead we
# should calculate the sum of the subtasks (an upper bound), instead of
Expand Down Expand Up @@ -440,7 +449,7 @@ def run(self):
self._handle_future_completion(future, task_id)

def _update_failed_task_state(
self, task_id: Optional[int],
self, task_id: int,
failure_type: managed_job_state.ManagedJobStatus,
failure_reason: str):
"""Update the state of the failed task."""
Expand Down

0 comments on commit 934bde3

Please sign in to comment.