From f48fe03b7c845d032e9f2bdc01d1939f2a6db8ff Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Wed, 17 Feb 2021 11:06:00 -0600 Subject: [PATCH] Poll for new tasks (#176) - Setting to configure a polling interval for new task to reduce load when there are many workers & tasks. Instead of subscribing to the activity channel, which results in N_WORKERS messages for each task (and then each of them polling for the task), we just poll periodically. Downside is that with polling enabled and few workers it might take up to the polling interval to pick up a new task. - Setting to disable publishing to the activity channel. This should be only used if all workers are polling. --- tasktiger/task.py | 4 ++-- tasktiger/tasktiger.py | 7 ++++++ tasktiger/worker.py | 52 ++++++++++++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index 7e0f504e..fb72f39b 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -285,7 +285,7 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): _key(from_state), queue, _key(from_state, queue), client=pipeline ) - if to_state == QUEUED: + if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]: pipeline.publish(_key('activity'), queue) try: @@ -361,7 +361,7 @@ def delay(self, when=None, max_queue_size=None): mode='nx', client=pipeline, ) - if state == QUEUED: + if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]: pipeline.publish(tiger._key('activity'), self.queue) pipeline.execute() diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index beb4b9c0..68f1f666 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -195,6 +195,13 @@ def init(self, connection=None, config=None, setup_structlog=False): # increase Redis storage requirements and therefore can be disabled # if that is a concern. 'STORE_TRACEBACKS': True, + # Set to > 0 to poll periodically for queues with tasks. Otherwise + # subscribe to the activity channel. Use for more efficient task + # processing with a large amount of workers. + 'POLL_TASK_QUEUES_INTERVAL': 0, + # Whether to publish new tasks to the activity channel. Only set to + # False if all the workers are polling queues. + 'PUBLISH_QUEUED_TASKS': True, } if config: self.config.update(config) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index de83feb2..beb4cdb7 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -207,12 +207,27 @@ def _worker_queue_scheduled_tasks(self): # XXX: ideally this would be in the same pipeline, but we only want # to announce if there was a result. if result: - self.connection.publish(self._key('activity'), queue) + if self.config["PUBLISH_QUEUED_TASKS"]: + self.connection.publish(self._key('activity'), queue) self._did_work = True - def _wait_for_new_tasks(self, timeout=0, batch_timeout=0): + def _poll_for_queues(self): """ - Check activity channel and wait as necessary. + Refresh list of queues. + + Wait if we did not do any work. + + This is only used when using polling to get queues with queued tasks. + """ + if not self._did_work: + time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) + self._refresh_queue_set() + + def _pubsub_for_queues(self, timeout=0, batch_timeout=0): + """ + Check activity channel for new queues and wait as necessary. + + This is only used when using pubsub to get queues with queued tasks. This method is also used to slow down the main processing loop to reduce the effects of rapidly sending Redis commands. This method will exit @@ -223,7 +238,6 @@ def _wait_for_new_tasks(self, timeout=0, batch_timeout=0): 3. Timeout seconds have passed, this is the maximum time to stay in this method """ - new_queue_found = False start_time = batch_exit = time.time() while True: @@ -1091,6 +1105,11 @@ def _queue_periodic_tasks(self): 'queued periodic task', func=task.serialized_func, when=when ) + def _refresh_queue_set(self): + self._queue_set = set( + self._filter_queues(self.connection.smembers(self._key(QUEUED))) + ) + def run(self, once=False, force_once=False): """ Main loop of the worker. @@ -1125,21 +1144,25 @@ def run(self, once=False, force_once=False): # Then, listen to the activity channel. # XXX: This can get inefficient when having lots of queues. - self._pubsub = self.connection.pubsub() - self._pubsub.subscribe(self._key('activity')) + if self.config["POLL_TASK_QUEUES_INTERVAL"]: + self._pubsub = None + else: + self._pubsub = self.connection.pubsub() + self._pubsub.subscribe(self._key('activity')) - self._queue_set = set( - self._filter_queues(self.connection.smembers(self._key(QUEUED))) - ) + self._refresh_queue_set() try: while True: # Update the queue set on every iteration so we don't get stuck # on processing a specific queue. - self._wait_for_new_tasks( - timeout=self.config['SELECT_TIMEOUT'], - batch_timeout=self.config['SELECT_BATCH_TIMEOUT'], - ) + if self._pubsub: + self._pubsub_for_queues( + timeout=self.config['SELECT_TIMEOUT'], + batch_timeout=self.config['SELECT_BATCH_TIMEOUT'], + ) + else: + self._poll_for_queues() self._install_signal_handlers() self._did_work = False @@ -1162,5 +1185,6 @@ def run(self, once=False, force_once=False): self.stats_thread = None # Free up Redis connection - self._pubsub.reset() + if self._pubsub: + self._pubsub.reset() self.log.info('done')