Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLURM wrapper added for pyani #236

Closed
wants to merge 13 commits into from
15 changes: 7 additions & 8 deletions pyani/anib.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import re
import shutil
import subprocess
import secrets

from logging import Logger
from pathlib import Path
Expand Down Expand Up @@ -431,14 +432,12 @@ def construct_blastn_cmdline(
:param blastn_exe: str, path to blastn executable
"""
prefix = outdir / f"{fname1.stem.replace('-fragments', '')}_vs_{fname2.stem}"
return (
f"{blastn_exe} -out {prefix}.blast_tab -query {fname1} -db {fname2} "
"-xdrop_gap_final 150 -dust no -evalue 1e-15 -max_target_seqs 1 -outfmt "
"'6 qseqid sseqid length mismatch pident nident qlen slen "
"qstart qend sstart send positive ppos gaps' "
"-task blastn"
)

command = "#!/bin/bash\n" + f"{blastn_exe} -out {prefix}.blast_tab -query {fname1} -db {fname2} -xdrop_gap_final 150 -dust no -evalue 1e-15 -max_target_seqs 1 -outfmt '6 qseqid sseqid length mismatch pident nident qlen slen qstart qend sstart send positive ppos gaps' -task blastn"
fname = secrets.token_hex(nbytes=16) # generates random string to use as filename
print(command)
with open("jobs/" + fname, 'w') as fh:
fh.write(command + "\n")
return "bash ./jobs/" + fname

# Generate single BLASTALL command line
def construct_blastall_cmdline(
Expand Down
5 changes: 3 additions & 2 deletions pyani/anim.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def get_version(nucmer_exe: Path = pyani_config.NUCMER_DEFAULT) -> str:
result = subprocess.run(
cmdline, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
match = re.search(r"(?<=version\s)[0-9\.]*", str(result.stderr, "utf-8"))
version = match.group() # type: ignore
#match = re.search(r"(?<=version\s)[0-9\.]*", str(result.stderr, "utf-8"))
#version = match.group() # type: ignore
version = str(result.stdout, "utf-8")
return f"{platform.system()}_{version}"


Expand Down
2 changes: 1 addition & 1 deletion pyani/pyani_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
BLASTALL_DEFAULT = Path("blastall")
FORMATDB_DEFAULT = Path("formatdb")
QSUB_DEFAULT = Path("qsub")

SLURM_DEFAULT = Path("sbatch")
# Stems for output files
ANIM_FILESTEMS = (
"ANIm_alignment_lengths",
Expand Down
38 changes: 30 additions & 8 deletions pyani/pyani_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

import os
import time

from subprocess import Popen, PIPE
from typing import Any, Dict, List, Optional

from .pyani_config import SGE_WAIT
Expand Down Expand Up @@ -119,7 +119,6 @@ def wait(self, interval: float = SGE_WAIT) -> None:
interval = min(2.0 * interval, 60)
self.finished = os.system(f"qstat -j {self.name} > /dev/null")


class JobGroup(object):

"""Class that stores a group of jobs, permitting parameter sweeps."""
Expand All @@ -128,8 +127,10 @@ def __init__(
self,
name: str,
command: str,
scheduler: str,
queue: Optional[str] = None,
arguments: Optional[Dict[str, List[Any]]] = None,

) -> None:
"""Instantiate a JobGroup object.

Expand All @@ -144,7 +145,7 @@ def __init__(
For example, to use a command 'my_cmd' with the arguments
'-foo' and '-bar' having values 1, 2, 3, 4 and 'a', 'b', 'c', 'd' in
all combinations, respectively, you would pass
command='my_cmd $SGE_TASK_ID -foo $fooargs -bar $barargs'
command='my_cmd $SLURM_TASK_ID -foo $fooargs -bar $barargs'
arguments='{'fooargs': ['1','2','3','4'],
'barargs': ['a','b','c','d']}
"""
Expand All @@ -154,20 +155,21 @@ def __init__(
self.dependencies = [] # type: List[Any]
self.submitted = False # type: bool
self.finished = False # type: int
self.scheduler = scheduler
if arguments is not None:
self.arguments = arguments # Dictionary of arguments for command
else:
self.arguments = {}
self.generate_script() # Make SGE script for sweep/array

def generate_script(self) -> None:
"""Create the SGE script that will run the jobs in the JobGroup."""
"""Create the SLURM script that will run the jobs in the JobGroup."""
self.script = "" # type: str
total = 1 # total number of jobs in this group

# for now, SGE_TASK_ID becomes TASK_ID, but we base it at zero
self.script += """let "TASK_ID=$SGE_TASK_ID - 1"\n"""

# for now, SLURM_TASK_ID becomes TASK_ID, but we base it at zero
#self.script += """let "TASK_ID=$SLURM_TASK_ID - 1"\n"""
self.script += """let "TASK_ID=$SLURM_ARRAY_TASK_ID - 1"\n"""
# build the array definitions
for key in sorted(self.arguments.keys()):
# The keys are sorted for py3.5 compatibility with tests
Expand All @@ -191,6 +193,7 @@ def generate_script(self) -> None:

# now, add the command to run the job
self.script += "\n"
self.script += "echo " + self.command + "\n"
self.script += self.command
self.script += "\n"

Expand Down Expand Up @@ -222,4 +225,23 @@ def wait(self, interval: float = SGE_WAIT) -> None:
while not self.finished:
time.sleep(interval)
interval = min(2 * interval, 60)
self.finished = os.system("qstat -j %s > /dev/null" % (self.name))

if self.scheduler.lower() == "sge" : # hpc is SGE
self.finished = os.system("qstat -j %s > /dev/null" % (self.name))

elif self.scheduler.lower() == "slurm" : # hpc is SLURM
print("Scheduler slurm: squeue -n %s" % (self.name), "finished? ", self.finished)
cmd = "squeue -n %s | tail -n+2 | wc -l" % (self.name)
count = get_cmd_output(cmd)

if int(count) == 0:
self.finished = True
print("Finished ", self.finished)

def get_cmd_output(cmd):

""" call subprocess popen to get command stdout """

p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
out, error = p.communicate()
return str(out, 'utf-8')
28 changes: 14 additions & 14 deletions pyani/run_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def compile_jobgroups_from_joblist(
sge_jobcmdlist = [f'"{jc}"' for jc in sublist]
jobgroups.append(
JobGroup(
f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist}
f"{jgprefix}_{count}", "$cmds", arguments={"cmds": sge_jobcmdlist}, scheduler="sge"
)
)
return jobgroups
Expand All @@ -117,15 +117,15 @@ def run_dependency_graph(
jobgraph,
jgprefix: str = "ANIm_SGE_JG",
sgegroupsize: int = 10000,
sgeargs: Optional[str] = None,
schedulerargs: Optional[str] = None,
) -> None:
"""Create and runs SGE scripts for jobs based on passed jobgraph.

:param jobgraph: list of jobs, which may have dependencies.
:param verbose: flag for multiprocessing verbosity
:param jgprefix: a prefix for the submitted jobs, in the scheduler
:param sgegroupsize: the maximum size for an array job submission
:param sgeargs: additional arguments to qsub
:param schedulerargs: additional arguments to qsub

The strategy here is to loop over each job in the dependency graph
and, because we expect a single main delta-filter (wrapped) job,
Expand Down Expand Up @@ -180,7 +180,7 @@ def run_dependency_graph(
logger.info("Jobs passed to scheduler in order:")
for job in jobgroups:
logger.info("\t%s" % job.name)
build_and_submit_jobs(Path.cwd(), jobgroups, sgeargs)
build_and_submit_jobs(Path.cwd(), jobgroups, schedulerargs)
logger.info("Waiting for SGE-submitted jobs to finish (polling)")
for job in jobgroups:
job.wait()
Expand Down Expand Up @@ -263,13 +263,13 @@ def extract_submittable_jobs(waiting: List) -> List:


def submit_safe_jobs(
root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None
root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None
) -> None:
"""Submit passed list of jobs to SGE server with dir as root for output.

:param root_dir: path to output directory
:param jobs: iterable of Job objects
:param sgeargs: str, additional arguments for qsub
:param schedulerargs: str, additional arguments for qsub
"""
logger = logging.getLogger(__name__)
logger.debug("Received %s jobs", len(jobs))
Expand Down Expand Up @@ -304,41 +304,41 @@ def submit_safe_jobs(

# Build the qsub SGE commandline (passing local environment)
qsubcmd = f"{pyani_config.QSUB_DEFAULT} -V {args} {job.scriptpath}"
if sgeargs is not None:
qsubcmd = f"{qsubcmd} {sgeargs}"
if schedulerargs is not None:
qsubcmd = f"{qsubcmd} {schedulerargs}"
# We've considered Bandit warnings B404,B603 and silence
# subprocess.call(qsubcmd, shell=False) # nosec
os.system(qsubcmd)
job.submitted = True # Set the job's submitted flag to True


def submit_jobs(root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None) -> None:
def submit_jobs(root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None) -> None:
"""Submit passed jobs to SGE server with passed directory as root.

:param root_dir: path to output directory
:param jobs: list of Job objects
:param sgeargs: str, additional arguments for qsub
:param schedulerargs: str, additional arguments for qsub
"""
waiting = list(jobs) # List of jobs still to be done
# Loop over the list of pending jobs, while there still are any
while waiting:
# extract submittable jobs
submittable = extract_submittable_jobs(waiting)
# run those jobs
submit_safe_jobs(root_dir, submittable, sgeargs)
submit_safe_jobs(root_dir, submittable, schedulerargs)
# remove those from the waiting list
for job in submittable:
waiting.remove(job)


def build_and_submit_jobs(
root_dir: Path, jobs: Iterable, sgeargs: Optional[str] = None
root_dir: Path, jobs: Iterable, schedulerargs: Optional[str] = None
) -> None:
"""Submit passed iterable of Job objects to SGE.

:param root_dir: root directory for SGE and job output
:param jobs: list of Job objects, describing each job to be submitted
:param sgeargs: str, additional arguments to qsub
:param schedulerargs: str, additional arguments to qsub

This places SGE's output in the passed root directory
"""
Expand All @@ -350,4 +350,4 @@ def build_and_submit_jobs(
# Build and submit the passed jobs
build_directories(root_dir) # build all necessary directories
build_job_scripts(root_dir, jobs) # build job scripts
submit_jobs(root_dir, jobs, sgeargs) # submit the jobs to SGE
submit_jobs(root_dir, jobs, schedulerargs) # submit the jobs to SGE
Loading