Skip to content

Commit

Permalink
[Core] Fix job race condition. (#4193)
Browse files Browse the repository at this point in the history
* [Core] Fix job race condition.

* fix

* simplify url

* change to list_jobs

* upd ray comments

* only store jobs in ray_id_set
  • Loading branch information
cblmemo authored Oct 30, 2024
1 parent 46ea0d8 commit 9f055f4
Showing 1 changed file with 34 additions and 42 deletions.
76 changes: 34 additions & 42 deletions sky/skylet/job_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,16 +512,13 @@ def _get_jobs_by_ids(job_ids: List[int]) -> List[Dict[str, Any]]:
return records


def _get_pending_jobs():
rows = _CURSOR.execute(
'SELECT job_id, created_time, submit FROM pending_jobs')
rows = list(rows)
return {
job_id: {
'created_time': created_time,
'submit': submit
} for job_id, created_time, submit in rows
}
def _get_pending_job(job_id: int) -> Optional[Dict[str, Any]]:
rows = _CURSOR.execute('SELECT created_time, submit FROM pending_jobs '
f'WHERE job_id={job_id!r}')
for row in rows:
created_time, submit = row
return {'created_time': created_time, 'submit': submit}
return None


def update_job_status(job_ids: List[int],
Expand All @@ -535,7 +532,7 @@ def update_job_status(job_ids: List[int],
during job cancelling, we still need this to handle the staleness problem,
caused by instance restarting and other corner cases (if any).
This function should only be run on the remote instance with ray==2.4.0.
This function should only be run on the remote instance with ray>=2.4.0.
"""
if len(job_ids) == 0:
return []
Expand All @@ -547,50 +544,45 @@ def update_job_status(job_ids: List[int],

# In ray 2.4.0, job_client.list_jobs returns a list of JobDetails,
# which contains the job status (str) and submission_id (str).
ray_job_query_time = time.time()
job_detail_lists: List['ray_pydantic.JobDetails'] = job_client.list_jobs()

pending_jobs = _get_pending_jobs()
job_details = {}
ray_job_ids_set = set(ray_job_ids)
for job_detail in job_detail_lists:
if job_detail.submission_id in ray_job_ids_set:
job_details[job_detail.submission_id] = job_detail
job_statuses: List[Optional[JobStatus]] = [None] * len(ray_job_ids)
for i, ray_job_id in enumerate(ray_job_ids):
job_id = job_ids[i]
if ray_job_id in job_details:
ray_status = job_details[ray_job_id].status
job_statuses[i] = _RAY_TO_JOB_STATUS_MAP[ray_status]
if job_id in pending_jobs:
if pending_jobs[job_id]['created_time'] < psutil.boot_time():
logger.info(
f'Job {job_id} is stale, setting to FAILED: '
f'created_time={pending_jobs[job_id]["created_time"]}, '
f'boot_time={psutil.boot_time()}')
# The job is stale as it is created before the instance
# is booted, e.g. the instance is rebooted.
job_statuses[i] = JobStatus.FAILED
# Gives a 60 second grace period between job being submit from
# the pending table until appearing in ray jobs.
if (pending_jobs[job_id]['submit'] > 0 and
pending_jobs[job_id]['submit'] <
time.time() - _PENDING_SUBMIT_GRACE_PERIOD):
# For jobs submitted outside of the grace period, we will
# consider the ray job status.
continue
else:
# Reset the job status to PENDING even though it may not appear
# in the ray jobs, so that it will not be considered as stale.
job_statuses[i] = JobStatus.PENDING

assert len(job_statuses) == len(job_ids), (job_statuses, job_ids)

statuses = []
for job_id, status in zip(job_ids, job_statuses):
for job_id, ray_job_id in zip(job_ids, ray_job_ids):
# Per-job status lock is required because between the job status
# query and the job status update, the job status in the databse
# can be modified by the generated ray program.
with filelock.FileLock(_get_lock_path(job_id)):
status = None
if ray_job_id in job_details:
ray_status = job_details[ray_job_id].status
status = _RAY_TO_JOB_STATUS_MAP[ray_status]
pending_job = _get_pending_job(job_id)
if pending_job is not None:
if pending_job['created_time'] < psutil.boot_time():
logger.info(f'Job {job_id} is stale, setting to FAILED: '
f'created_time={pending_job["created_time"]}, '
f'boot_time={psutil.boot_time()}')
# The job is stale as it is created before the instance
# is booted, e.g. the instance is rebooted.
status = JobStatus.FAILED
# Gives a 60 second grace period between job being submit from
# the pending table until appearing in ray jobs. For jobs
# submitted outside of the grace period, we will consider the
# ray job status.
if not (pending_job['submit'] > 0 and pending_job['submit'] <
ray_job_query_time - _PENDING_SUBMIT_GRACE_PERIOD):
# Reset the job status to PENDING even though it may not
# appear in the ray jobs, so that it will not be considered
# as stale.
status = JobStatus.PENDING

original_status = get_status_no_lock(job_id)
assert original_status is not None, (job_id, status)
if status is None:
Expand Down

0 comments on commit 9f055f4

Please sign in to comment.