Skip to content

Commit

Permalink
Poll for new tasks (#176)
Browse files Browse the repository at this point in the history
- Setting to configure a polling interval for new task to reduce load when there are many workers & tasks. Instead of subscribing to the activity channel, which results in N_WORKERS messages for each task (and then each of them polling for the task), we just poll periodically. Downside is that with polling enabled and few workers it might take up to the polling interval to pick up a new task.
- Setting to disable publishing to the activity channel. This should be only used if all workers are polling.
  • Loading branch information
thomasst authored Feb 17, 2021
1 parent 6d96607 commit f48fe03
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
4 changes: 2 additions & 2 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None):
_key(from_state), queue, _key(from_state, queue), client=pipeline
)

if to_state == QUEUED:
if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(_key('activity'), queue)

try:
Expand Down Expand Up @@ -361,7 +361,7 @@ def delay(self, when=None, max_queue_size=None):
mode='nx',
client=pipeline,
)
if state == QUEUED:
if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]:
pipeline.publish(tiger._key('activity'), self.queue)
pipeline.execute()

Expand Down
7 changes: 7 additions & 0 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ def init(self, connection=None, config=None, setup_structlog=False):
# increase Redis storage requirements and therefore can be disabled
# if that is a concern.
'STORE_TRACEBACKS': True,
# Set to > 0 to poll periodically for queues with tasks. Otherwise
# subscribe to the activity channel. Use for more efficient task
# processing with a large amount of workers.
'POLL_TASK_QUEUES_INTERVAL': 0,
# Whether to publish new tasks to the activity channel. Only set to
# False if all the workers are polling queues.
'PUBLISH_QUEUED_TASKS': True,
}
if config:
self.config.update(config)
Expand Down
52 changes: 38 additions & 14 deletions tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,27 @@ def _worker_queue_scheduled_tasks(self):
# XXX: ideally this would be in the same pipeline, but we only want
# to announce if there was a result.
if result:
self.connection.publish(self._key('activity'), queue)
if self.config["PUBLISH_QUEUED_TASKS"]:
self.connection.publish(self._key('activity'), queue)
self._did_work = True

def _wait_for_new_tasks(self, timeout=0, batch_timeout=0):
def _poll_for_queues(self):
"""
Check activity channel and wait as necessary.
Refresh list of queues.
Wait if we did not do any work.
This is only used when using polling to get queues with queued tasks.
"""
if not self._did_work:
time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"])
self._refresh_queue_set()

def _pubsub_for_queues(self, timeout=0, batch_timeout=0):
"""
Check activity channel for new queues and wait as necessary.
This is only used when using pubsub to get queues with queued tasks.
This method is also used to slow down the main processing loop to reduce
the effects of rapidly sending Redis commands. This method will exit
Expand All @@ -223,7 +238,6 @@ def _wait_for_new_tasks(self, timeout=0, batch_timeout=0):
3. Timeout seconds have passed, this is the maximum time to stay in
this method
"""

new_queue_found = False
start_time = batch_exit = time.time()
while True:
Expand Down Expand Up @@ -1091,6 +1105,11 @@ def _queue_periodic_tasks(self):
'queued periodic task', func=task.serialized_func, when=when
)

def _refresh_queue_set(self):
self._queue_set = set(
self._filter_queues(self.connection.smembers(self._key(QUEUED)))
)

def run(self, once=False, force_once=False):
"""
Main loop of the worker.
Expand Down Expand Up @@ -1125,21 +1144,25 @@ def run(self, once=False, force_once=False):
# Then, listen to the activity channel.
# XXX: This can get inefficient when having lots of queues.

self._pubsub = self.connection.pubsub()
self._pubsub.subscribe(self._key('activity'))
if self.config["POLL_TASK_QUEUES_INTERVAL"]:
self._pubsub = None
else:
self._pubsub = self.connection.pubsub()
self._pubsub.subscribe(self._key('activity'))

self._queue_set = set(
self._filter_queues(self.connection.smembers(self._key(QUEUED)))
)
self._refresh_queue_set()

try:
while True:
# Update the queue set on every iteration so we don't get stuck
# on processing a specific queue.
self._wait_for_new_tasks(
timeout=self.config['SELECT_TIMEOUT'],
batch_timeout=self.config['SELECT_BATCH_TIMEOUT'],
)
if self._pubsub:
self._pubsub_for_queues(
timeout=self.config['SELECT_TIMEOUT'],
batch_timeout=self.config['SELECT_BATCH_TIMEOUT'],
)
else:
self._poll_for_queues()

self._install_signal_handlers()
self._did_work = False
Expand All @@ -1162,5 +1185,6 @@ def run(self, once=False, force_once=False):
self.stats_thread = None

# Free up Redis connection
self._pubsub.reset()
if self._pubsub:
self._pubsub.reset()
self.log.info('done')

0 comments on commit f48fe03

Please sign in to comment.