Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include queue name in unique task ID generation #181

Open
wants to merge 3 commits into
base: task-unique-key
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ def gen_id():
return binascii.b2a_hex(os.urandom(32)).decode('utf8')


def gen_unique_id(serialized_name, args, kwargs):
def gen_unique_id(queue, serialized_name, args, kwargs):
"""
Generates and returns a hex-encoded 256-bit ID for the given task name and
args. Used to generate IDs for unique tasks or for task locks.
"""
data = {
'func': serialized_name,
'args': args,
'kwargs': kwargs,
}
if queue is not None:
data['queue'] = queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to serialize an explicit null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use the same function for to compute the lock ID (in which case we pass None for the queue). It would break existing locks if we serialized an explicit null. I can add a comment for this.

return hashlib.sha256(
json.dumps(
{'func': serialized_name, 'args': args, 'kwargs': kwargs},
sort_keys=True,
).encode('utf8')
json.dumps(data, sort_keys=True).encode('utf8')
).hexdigest()


Expand Down
5 changes: 3 additions & 2 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ def __init__(
if unique or unique_key:
if unique_key:
task_id = gen_unique_id(
queue,
serialized_name,
None,
{key: kwargs.get(key) for key in unique_key},
)
else:
task_id = gen_unique_id(serialized_name, args, kwargs)
task_id = gen_unique_id(queue, serialized_name, args, kwargs)
else:
task_id = gen_id()

Expand Down Expand Up @@ -581,7 +582,7 @@ def _queue_for_next_period(self):
# between executions
task = self.clone()
task._data['id'] = gen_unique_id(
task.serialized_func, task.args, task.kwargs
task.queue, task.serialized_func, task.args, task.kwargs
)
task.delay(when=when)
return when
3 changes: 2 additions & 1 deletion tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,13 +857,14 @@ def _execute_task_group(self, queue, tasks, all_task_ids, queue_lock):
if task.lock_key:
kwargs = task.kwargs
lock_id = gen_unique_id(
None,
task.serialized_func,
None,
{key: kwargs.get(key) for key in task.lock_key},
)
else:
lock_id = gen_unique_id(
task.serialized_func, task.args, task.kwargs
None, task.serialized_func, task.args, task.kwargs
)

if lock_id not in lock_ids:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def test_periodic_execution_unique_ids(self):

# generate the expected unique id
expected_unique_id = gen_unique_id(
serialize_func_name(periodic_task), [], {}
"periodic", serialize_func_name(periodic_task), [], {}
)

# pull task out of the queue by id. If found, then the id is correct
Expand Down Expand Up @@ -202,10 +202,10 @@ def test_periodic_execution_unique_ids_self_correct(self):

# generate the ids
correct_unique_id = gen_unique_id(
serialize_func_name(periodic_task), [], {}
"periodic", serialize_func_name(periodic_task), [], {}
)
malformed_unique_id = gen_unique_id(
serialize_func_name(periodic_task), None, None
"periodic", serialize_func_name(periodic_task), None, None
)

task = Task(tiger, func=periodic_task)
Expand Down