Skip to content

Commit

Permalink
Merge pull request #85 from cgat-developers/AH-add-gevent-pool-compat…
Browse files Browse the repository at this point in the history
…ibility

Ah add gevent pool compatibility
  • Loading branch information
AndreasHeger authored May 23, 2018
2 parents b57ca2c + 4424966 commit 4dbf273
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 105 deletions.
19 changes: 7 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ language: python
# all python versions tested via pip in tox
python:
- "2.7"

# Install stuff
- "3.4"
- "3.5"
- "3.6"

before_install:
# - travis/install_sge.sh
# - export SGE_ROOT=/var/lib/gridengine
# - export SGE_CELL=default
# - export DRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0
# Work around for multiprocessing permission bug with Travis
# - sudo rm -rf /dev/shm
# - sudo ln -s /run/shm /dev/shm

install:
- pip install tox
script: tox
install: python setup.py install

script: cd ruffus/test && /bin/bash run_all_unit_tests.cmd

sudo: false
51 changes: 25 additions & 26 deletions ruffus/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,17 @@ def append_to_argparse (parser, **args_dict):
else:
ignored_args = set()



common_options = parser.add_argument_group('Common options')
if "verbose" not in ignored_args:
common_options.add_argument('--verbose', "-v", const="+", default=[], nargs='?',
action="append",
help="Print more verbose messages for each additional verbose level.")
common_options.add_argument(
'--verbose', "-v", const="+", default=[], nargs='?',
action="append",
help="Print more verbose messages for each additional verbose level.")
if "version" not in ignored_args:
common_options.add_argument('--version', action='version', version=prog_version)
if "log_file" not in ignored_args:
common_options.add_argument("-L", "--log_file", metavar="FILE", type=str,
help="Name and path of log file")
help="Name and path of log file")


#
Expand Down Expand Up @@ -357,15 +356,15 @@ def append_to_optparse (parser, **args_dict):
# general options: verbosity / logging
#
if "verbose" not in ignored_args:
parser.add_option("-v", "--verbose", dest = "verbose",
parser.add_option("-v", "--verbose", dest="verbose",
action="callback", default=[],
#---------------------------------------------------------------
# ---------------------------------------------------------------
# hack to get around unreasonable discrimination against
# --long_options=with_equals in opt_parse::_process_long_opt()
# when using a callback
type = int,
type=int,
nargs=0,
#---------------------------------------------------------------
# ---------------------------------------------------------------
callback=vararg_callback,
help="Print more verbose messages for each additional verbose level.")
if "log_file" not in ignored_args:
Expand All @@ -377,30 +376,30 @@ def append_to_optparse (parser, **args_dict):
# pipeline
#
if "target_tasks" not in ignored_args:
parser.add_option("-t", "--target_tasks", dest="target_tasks",
action="append",
default = list(),
metavar="JOBNAME",
type="string",
help="Target task(s) of pipeline.")
parser.add_option("-t", "--target_tasks", dest="target_tasks",
action="append",
default=list(),
metavar="JOBNAME",
type="string",
help="Target task(s) of pipeline.")
if "jobs" not in ignored_args:
parser.add_option("-j", "--jobs", dest="jobs",
default=1,
metavar="N",
type="int",
help="Allow N jobs (commands) to run simultaneously.")
default=1,
metavar="N",
type="int",
help="Allow N jobs (commands) to run simultaneously.")
if "use_threads" not in ignored_args:
parser.add_option("--use_threads", dest="use_threads",
action="store_true", default=False,
help="Use multiple threads rather than processes. Needs --jobs N with N > 1")
action="store_true", default=False,
help="Use multiple threads rather than processes. Needs --jobs N with N > 1")
if "just_print" not in ignored_args:
parser.add_option("-n", "--just_print", dest="just_print",
action="store_true", default=False,
help="Don't actually run any commands; just print the pipeline.")
action="store_true", default=False,
help="Don't actually run any commands; just print the pipeline.")
if "touch_files_only" not in ignored_args:
parser.add_option("--touch_files_only", dest="touch_files_only",
action="store_true", default=False,
help="Don't actually run any commands; just 'touch' the output for each task to make them appear up to date.")
action="store_true", default=False,
help="Don't actually run any commands; just 'touch' the output for each task to make them appear up to date.")
if "recreate_database" not in ignored_args:
parser.add_option("--recreate_database", dest="recreate_database",
action="store_true", default=False,
Expand Down
2 changes: 1 addition & 1 deletion ruffus/ruffus_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#################################################################################
__version='2.6.3'
__version = '2.7.0'
107 changes: 63 additions & 44 deletions ruffus/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def func_b ():
import os
import sys
import copy
import multiprocessing
from multiprocessing import Pool as ProcessPool
from multiprocessing.pool import ThreadPool
import collections

# 88888888888888888888888888888888888888888888888888888888888888888888888888888
Expand All @@ -94,8 +95,6 @@ def func_b ():
import logging
import re
from collections import defaultdict, deque
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import traceback
import types
if sys.hexversion >= 0x03000000:
Expand Down Expand Up @@ -901,7 +900,11 @@ def __init__(self, name, *arg, **kw):

self.command_str_callback = subprocess_checkcall_wrapper


@classmethod
def clear_all(cls):
"""clear all pipelines.
"""
cls.pipelines = dict()
# _________________________________________________________________________

# _create_task
Expand Down Expand Up @@ -974,7 +977,6 @@ def _complete_task_setup(self, processed_tasks):
Make sure all tasks in dependency list are linked to real functions
"""


processed_pipelines = set([self.name])
unprocessed_tasks = deque(self.tasks)
while len(unprocessed_tasks):
Expand All @@ -996,7 +998,6 @@ def _complete_task_setup(self, processed_tasks):
task._is_single_job_single_output = \
task._is_single_job_single_output._is_single_job_single_output


for pipeline_name in list(processed_pipelines):
if pipeline_name != self.name:
processed_pipelines |= self.pipelines[pipeline_name]._complete_task_setup(processed_tasks)
Expand Down Expand Up @@ -4424,12 +4425,8 @@ def lookup_pipeline(pipeline):
raise error_not_a_pipeline("%s does not name a pipeline." % pipeline)




# _____________________________________________________________________________

# _pipeline_prepare_to_run

# _____________________________________________________________________________
def _pipeline_prepare_to_run(checksum_level, history_file, pipeline, runtime_data, target_tasks, forcedtorun_tasks):
"""
Expand Down Expand Up @@ -5438,13 +5435,13 @@ def pipeline_run(target_tasks=[],
history_file=None,
# defaults to 2 if None
verbose_abbreviated_path=None,
pipeline=None):
pipeline=None,
pool_manager="multiprocessing"):
# Remember to add further extra parameters here to
# "extra_pipeline_run_options" inside cmdline.py
# This will forward extra parameters from the command line to
# pipeline_run
"""
Run pipelines.
"""Run pipelines.
:param target_tasks: targets task functions which will be run if they are
out-of-date
Expand All @@ -5459,7 +5456,7 @@ def pipeline_run(target_tasks=[],
is particularly useful to manage high performance
clusters which otherwise are prone to
"processor storms" when large number of cores finish
jobs at the same time. (Thanks Andreas Heger)
jobs at the same time.
:param logger: Where progress will be logged. Defaults to stderr output.
:type logger: `logging <http://docs.python.org/library/logging.html>`_
objects
Expand Down Expand Up @@ -5547,18 +5544,41 @@ def pipeline_run(target_tasks=[],
syncmanager = multiprocessing.Manager()

#
# whether using multiprocessing or multithreading
#
if multithread:
pool = ThreadPool(multithread)
parallelism = multithread
elif multiprocess > 1:
pool = Pool(multiprocess)
parallelism = multiprocess
# select pool and queue type. Selection is convoluted
# for backwards compatibility.
itr_kwargs = {}
if multiprocess is None:
multiprocess = 0
if multithread is None:
multithread = 0
parallelism = max(multiprocess, multithread)
if parallelism > 1:
if pool_manager == "multiprocessing":
if multithread:
pool_t = ThreadPool
queue_t = queue.Queue
elif multiprocess > 1:
pool_t = ProcessPool
queue_t = queue.Queue
# Use a timeout of 3 years per job..., so that the condition
# we are waiting for in the thread can be interrupted by
# signals... In other words, so that Ctrl-C works
# Yucky part is that timeout is an extra parameter to
# IMapIterator.next(timeout=None) but next() for normal
# iterators do not take any extra parameters.
itr_kwargs = dict(timeout=99999999)
elif pool_manager == "gevent":
import gevent.queue
import gevent.pool
pool_t = gevent.pool.Pool
queue_t = gevent.queue.Queue
else:
raise ValueError("unknown pool manager '{}'".format(pool_manager))
pool = pool_t(parallelism)
else:
parallelism = 1
pool = None

queue_t = queue.Queue

if verbose == 0:
logger = black_hole_logger
elif verbose >= 11:
Expand All @@ -5569,7 +5589,6 @@ def pipeline_run(target_tasks=[],
if hasattr(logger, "add_unique_prefix"):
logger.add_unique_prefix()


(checksum_level,
job_history,
pipeline,
Expand Down Expand Up @@ -5701,8 +5720,8 @@ def pipeline_run(target_tasks=[],
# prime queue with initial set of job parameters
#
death_event = syncmanager.Event()
parameter_q = queue.Queue()
task_with_completed_job_q = queue.Queue()
parameter_q = queue_t()
task_with_completed_job_q = queue_t()
parameter_generator = make_job_parameter_generator(incomplete_tasks, task_parents,
logger, forcedtorun_tasks,
task_with_completed_job_q,
Expand Down Expand Up @@ -5751,7 +5770,7 @@ def pipeline_run(target_tasks=[],
# except StopIteration:
# break

if pool:
if pool is not None:
pool_func = pool.imap_unordered
else:
pool_func = map
Expand All @@ -5777,14 +5796,8 @@ def pipeline_run(target_tasks=[],
# feed_job_params_to_process_pool()):
ii = iter(pool_func(run_pooled_job_without_exceptions, feed_job_params_to_process_pool()))
while 1:
# Use a timeout of 3 years per job..., so that the condition
# we are waiting for in the thread can be interrupted by
# signals... In other words, so that Ctrl-C works
# Yucky part is that timeout is an extra parameter to
# IMapIterator.next(timeout=None) but next() for normal
# iterators do not take any extra parameters.
if pool:
job_result = ii.next(timeout=99999999)
if pool is not None:
job_result = ii.next(**itr_kwargs)
else:
job_result = next(ii)
# run next task
Expand Down Expand Up @@ -5917,9 +5930,10 @@ def pipeline_run(target_tasks=[],
except:
pass

if pool:
log_at_level(logger, 10, verbose, " pool.close")
pool.close()
if pool is not None:
if hasattr(pool, "close"):
log_at_level(logger, 10, verbose, " pool.close")
pool.close()
log_at_level(logger, 10, verbose, " pool.terminate")
try:
pool.terminate()
Expand All @@ -5931,16 +5945,21 @@ def pipeline_run(target_tasks=[],
# log_at_level (logger, 10, verbose, " syncmanager.shutdown")
# syncmanager.shutdown()

if pool:
if pool is not None:
log_at_level(logger, 10, verbose, " pool.close")
# pool.join()
pool.close()
try:
pool.close()
except AttributeError:
pass
log_at_level(logger, 10, verbose, " pool.terminate")
# an exception may be thrown after a signal is caught (Ctrl-C)
# when the EventProxy(s) for death_event might be left hanging
try:
pool.terminate()
except:
except AttributeError:
pass
except Exception:
# an exception may be thrown after a signal is caught (Ctrl-C)
# when the EventProxy(s) for death_event might be left hanging
pass
log_at_level(logger, 10, verbose, " pool.terminated")

Expand Down
2 changes: 2 additions & 0 deletions ruffus/test/run_all_unit_tests.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,7 @@ echo Running test_transform_formatter.py
python -m unittest test_transform_formatter && \
echo Running test_check_if_uptodate.py && \
python -m unittest test_check_if_uptodate && \
echo Running test_pool_manager.py && \
python -m unittest test_pool_manager && \
echo DONE!!!

6 changes: 0 additions & 6 deletions ruffus/test/test_newstyle_regex_error_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ def test_regex_out_of_range_regex_reference_error_task(infiles, outfile,
raise Exception("Should blow up first")








test_pipeline = Pipeline("test")

test_pipeline.originate(task_func = generate_initial_files1,
Expand Down
Loading

0 comments on commit 4dbf273

Please sign in to comment.