Skip to content

Commit

Permalink
Bug fix that should stop deadlock when sacct_frequency is set too low
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanrichholt committed Feb 8, 2019
1 parent 9a89513 commit 212d893
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion jetstream/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def spawn(self, task):
log.debug('Spawn: {}'.format(task))

if 'cmd' not in task.directives():
return task.complete(0)
return task.complete()

cmd = task.directives()['cmd']
cpus = task.directives().get('cpus', 0)
Expand Down
32 changes: 20 additions & 12 deletions jetstream/backends/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ def __init__(self, sacct_frequency=60, sbatch_delay=0.1,
log.info('SlurmBackend initialized')

def _bump_next_update(self):
d = timedelta(seconds=self.sacct_frequency)
self._next_update = datetime.now() + d
log.debug(f'Next sacct update at {self._next_update.isoformat()}')
self._next_update = datetime.now() + timedelta(seconds=self.sacct_frequency)
log.debug(f'Next sacct update bumped to {self._next_update.isoformat()}')

async def wait_for_next_update(self):
"""This allows the wait time to be bumped up each time a job is
submitted. This means that sacct will never be checked immediately
after submitting jobs, and it protects against finding data from
old jobs with the same job ID in the database"""
while datetime.now() < self._next_update:
sleep_delta = self._next_update - datetime.now()
sleep_seconds = max(0, sleep_delta.total_seconds())
Expand All @@ -73,22 +76,24 @@ async def job_monitor(self):
try:
while 1:
await self.wait_for_next_update()
self._bump_next_update()

if not self.jobs:
log.debug('No current jobs to check')
self._bump_next_update()
continue

sacct_data = sacct(*self.jobs, return_data=True)
self._bump_next_update()

for jid, data in sacct_data.items():
if jid in self.jobs:
job = self.jobs[jid]
job.job_data = data

if job.is_done():
if job.is_done() and job.event is not None:
job.event.set()
self.jobs.pop(jid)

except CancelledError:
if self.jobs:
log.info('Requesting scancel for outstanding slurm jobs')
Expand Down Expand Up @@ -135,11 +140,9 @@ async def spawn(self, task):
log.debug(f'Spawn: {task}')

if not task.directives().get('cmd'):
return 0
return task.complete()

time.sleep(self.sbatch_delay) # sbatch breaks when called too frequently

self._bump_next_update()
stdin, stdout, stderr = self.get_fd_paths(task)

job = sbatch(
Expand All @@ -161,19 +164,20 @@ async def spawn(self, task):
slurm_job_id=job.jid,
slurm_cmd=' '.join(shlex.quote(a) for a in job.args)
)

self._bump_next_update()
log.info(f'SlurmBackend submitted: {task}')

event = asyncio.Event(loop=self.runner.loop)
job.event = event
job.event = asyncio.Event(loop=self.runner.loop)
self.jobs[job.jid] = job

try:
await event.wait()
await job.event.wait()
log.debug('Job event was set, gathering info and notifying workflow')

if self.sacct_fields:
job_info = {k: v for k, v in job.job_data.items() if
k in self.sacct_fields}

task.state['slurm_sacct'] = job_info

if job.is_ok():
Expand All @@ -183,10 +187,14 @@ async def spawn(self, task):
log.info(f'Failed: {task}')
task.fail(job.returncode())

log.debug('Done notifying workflow')

except asyncio.CancelledError:
job.cancel()
task.state['err'] = 'Runner cancelled Backend.spawn'
task.fail(-15)
finally:
log.debug(f'Slurmbackend spawn completed for {task}')


class SlurmBatchJob(object):
Expand Down
8 changes: 8 additions & 0 deletions jetstream/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def loop(self):
return self._loop

async def _autosave_coro(self):
log.debug('Autosaver started!')
self._last_save = datetime.now()
try:
while 1:
Expand All @@ -91,6 +92,7 @@ async def _autosave_coro(self):
# the minimum interval is reached.
time_since_last = datetime.now() - self._last_save
delay_for = mn - time_since_last
log.debug(f'Autosaver hold for {delay_for}s')
await asyncio.sleep(delay_for.seconds)

# Otherwise, wait for the next future to return and then save
Expand All @@ -100,15 +102,21 @@ async def _autosave_coro(self):
timeout_at = self._last_save + mx
timeout_in = (timeout_at - datetime.now()).seconds

log.debug(f'Autosaver next save in {timeout_in}s')
try:
await asyncio.wait_for(e.wait(), timeout=timeout_in)
log.debug(f'Autosaver wait cleared by runner')
except asyncio.TimeoutError:
log.debug(f'Autosaver wait timed out')
pass

log.debug('Autosaver saving workflow...')
self.save_workflow()
self._last_save = datetime.now()
except asyncio.CancelledError:
pass
finally:
log.debug('Autosaver stopped!')

def _cleanup_event_loop(self):
"""If the async event loop has outstanding futures, they must be
Expand Down

0 comments on commit 212d893

Please sign in to comment.