From 9f055f4afea63ca2689e2f64372381adcba828a6 Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Wed, 30 Oct 2024 00:54:03 -0700 Subject: [PATCH] [Core] Fix job race condition. (#4193) * [Core] Fix job race condition. * fix * simplify url * change to list_jobs * upd ray comments * only store jobs in ray_id_set --- sky/skylet/job_lib.py | 76 +++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 17908a1aec5..12d42d8c79c 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -512,16 +512,13 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]: return records -def _get_pending_jobs(): - rows = _CURSOR.execute( - 'SELECT job_id, created_time, submit FROM pending_jobs') - rows = list(rows) - return { - job_id: { - 'created_time': created_time, - 'submit': submit - } for job_id, created_time, submit in rows - } +def _get_pending_job(job_id: int) -> Optional[Dict[str, Any]]: + rows = _CURSOR.execute('SELECT created_time, submit FROM pending_jobs ' + f'WHERE job_id={job_id!r}') + for row in rows: + created_time, submit = row + return {'created_time': created_time, 'submit': submit} + return None def update_job_status(job_ids: List[int], @@ -535,7 +532,7 @@ def update_job_status(job_ids: List[int], during job cancelling, we still need this to handle the staleness problem, caused by instance restarting and other corner cases (if any). - This function should only be run on the remote instance with ray==2.4.0. + This function should only be run on the remote instance with ray>=2.4.0. """ if len(job_ids) == 0: return [] @@ -547,50 +544,45 @@ def update_job_status(job_ids: List[int], # In ray 2.4.0, job_client.list_jobs returns a list of JobDetails, # which contains the job status (str) and submission_id (str). + ray_job_query_time = time.time() job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs() - pending_jobs = _get_pending_jobs() job_details = {} ray_job_ids_set = set(ray_job_ids) for job_detail in job_detail_lists: if job_detail.submission_id in ray_job_ids_set: job_details[job_detail.submission_id] = job_detail - job_statuses: List[Optional[JobStatus]] = [None] * len(ray_job_ids) - for i, ray_job_id in enumerate(ray_job_ids): - job_id = job_ids[i] - if ray_job_id in job_details: - ray_status = job_details[ray_job_id].status - job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status] - if job_id in pending_jobs: - if pending_jobs[job_id]['created_time'] < psutil.boot_time(): - logger.info( - f'Job {job_id} is stale, setting to FAILED: ' - f'created_time={pending_jobs[job_id]["created_time"]}, ' - f'boot_time={psutil.boot_time()}') - # The job is stale as it is created before the instance - # is booted, e.g. the instance is rebooted. - job_statuses[i] = JobStatus.FAILED - # Gives a 60 second grace period between job being submit from - # the pending table until appearing in ray jobs. - if (pending_jobs[job_id]['submit'] > 0 and - pending_jobs[job_id]['submit'] < - time.time() - _PENDING_SUBMIT_GRACE_PERIOD): - # For jobs submitted outside of the grace period, we will - # consider the ray job status. - continue - else: - # Reset the job status to PENDING even though it may not appear - # in the ray jobs, so that it will not be considered as stale. - job_statuses[i] = JobStatus.PENDING - - assert len(job_statuses) == len(job_ids), (job_statuses, job_ids) statuses = [] - for job_id, status in zip(job_ids, job_statuses): + for job_id, ray_job_id in zip(job_ids, ray_job_ids): # Per-job status lock is required because between the job status # query and the job status update, the job status in the databse # can be modified by the generated ray program. with filelock.FileLock(_get_lock_path(job_id)): + status = None + if ray_job_id in job_details: + ray_status = job_details[ray_job_id].status + status = _RAY_TO_JOB_STATUS_MAP[ray_status] + pending_job = _get_pending_job(job_id) + if pending_job is not None: + if pending_job['created_time'] < psutil.boot_time(): + logger.info(f'Job {job_id} is stale, setting to FAILED: ' + f'created_time={pending_job["created_time"]}, ' + f'boot_time={psutil.boot_time()}') + # The job is stale as it is created before the instance + # is booted, e.g. the instance is rebooted. + status = JobStatus.FAILED + # Gives a 60 second grace period between job being submit from + # the pending table until appearing in ray jobs. For jobs + # submitted outside of the grace period, we will consider the + # ray job status. + if not (pending_job['submit'] > 0 and pending_job['submit'] < + ray_job_query_time - _PENDING_SUBMIT_GRACE_PERIOD): + # Reset the job status to PENDING even though it may not + # appear in the ray jobs, so that it will not be considered + # as stale. + status = JobStatus.PENDING + original_status = get_status_no_lock(job_id) assert original_status is not None, (job_id, status) if status is None: