From c097541072bcea7d5ff0e05db71ebd7b1289473a Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Sun, 21 Mar 2021 21:41:54 +0100 Subject: [PATCH 1/3] Ensure uniqueness by queue This avoids a "task not found" scenario where we queue a unique task into several queues. --- tasktiger/_internal.py | 9 +++++++-- tasktiger/task.py | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index 8dc160fb..a5e1dacb 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -57,14 +57,19 @@ def gen_id(): return binascii.b2a_hex(os.urandom(32)).decode('utf8') -def gen_unique_id(serialized_name, args, kwargs): +def gen_unique_id(queue, serialized_name, args, kwargs): """ Generates and returns a hex-encoded 256-bit ID for the given task name and args. Used to generate IDs for unique tasks or for task locks. """ return hashlib.sha256( json.dumps( - {'func': serialized_name, 'args': args, 'kwargs': kwargs}, + { + 'queue': queue, + 'func': serialized_name, + 'args': args, + 'kwargs': kwargs, + }, sort_keys=True, ).encode('utf8') ).hexdigest() diff --git a/tasktiger/task.py b/tasktiger/task.py index 2d4a8b4c..e052b906 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -91,12 +91,13 @@ def __init__( if unique or unique_key: if unique_key: task_id = gen_unique_id( + queue, serialized_name, None, {key: kwargs.get(key) for key in unique_key}, ) else: - task_id = gen_unique_id(serialized_name, args, kwargs) + task_id = gen_unique_id(queue, serialized_name, args, kwargs) else: task_id = gen_id() @@ -581,7 +582,7 @@ def _queue_for_next_period(self): # between executions task = self.clone() task._data['id'] = gen_unique_id( - task.serialized_func, task.args, task.kwargs + task.queue, task.serialized_func, task.args, task.kwargs ) task.delay(when=when) return when From 0e2a4fb9c5bd8b968794e1ac12abf4cac875d074 Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Sun, 21 Mar 2021 21:56:56 +0100 Subject: [PATCH 2/3] Fix periodic task tests --- tests/test_periodic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_periodic.py b/tests/test_periodic.py index 4d574645..53cc307a 100644 --- a/tests/test_periodic.py +++ b/tests/test_periodic.py @@ -143,7 +143,7 @@ def test_periodic_execution_unique_ids(self): # generate the expected unique id expected_unique_id = gen_unique_id( - serialize_func_name(periodic_task), [], {} + "periodic", serialize_func_name(periodic_task), [], {} ) # pull task out of the queue by id. If found, then the id is correct @@ -202,10 +202,10 @@ def test_periodic_execution_unique_ids_self_correct(self): # generate the ids correct_unique_id = gen_unique_id( - serialize_func_name(periodic_task), [], {} + "periodic", serialize_func_name(periodic_task), [], {} ) malformed_unique_id = gen_unique_id( - serialize_func_name(periodic_task), None, None + "periodic", serialize_func_name(periodic_task), None, None ) task = Task(tiger, func=periodic_task) From c7d589744374608b16c064457880bc7c558d25da Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Sun, 21 Mar 2021 22:00:48 +0100 Subject: [PATCH 3/3] Don't change key for locks --- tasktiger/_internal.py | 17 ++++++++--------- tasktiger/worker.py | 3 ++- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index a5e1dacb..35204385 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -62,16 +62,15 @@ def gen_unique_id(queue, serialized_name, args, kwargs): Generates and returns a hex-encoded 256-bit ID for the given task name and args. Used to generate IDs for unique tasks or for task locks. """ + data = { + 'func': serialized_name, + 'args': args, + 'kwargs': kwargs, + } + if queue is not None: + data['queue'] = queue return hashlib.sha256( - json.dumps( - { - 'queue': queue, - 'func': serialized_name, - 'args': args, - 'kwargs': kwargs, - }, - sort_keys=True, - ).encode('utf8') + json.dumps(data, sort_keys=True).encode('utf8') ).hexdigest() diff --git a/tasktiger/worker.py b/tasktiger/worker.py index beb4cdb7..66e12e89 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -857,13 +857,14 @@ def _execute_task_group(self, queue, tasks, all_task_ids, queue_lock): if task.lock_key: kwargs = task.kwargs lock_id = gen_unique_id( + None, task.serialized_func, None, {key: kwargs.get(key) for key in task.lock_key}, ) else: lock_id = gen_unique_id( - task.serialized_func, task.args, task.kwargs + None, task.serialized_func, task.args, task.kwargs ) if lock_id not in lock_ids: