From 677dbcb728b12cc20ce28b7623d615accca16a1f Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Sun, 12 Nov 2023 08:03:20 +0100 Subject: [PATCH] Prevent lost tasks when OOM kills parent worker process bugfix --- wakaq/worker.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/wakaq/worker.py b/wakaq/worker.py index 1fdc676..e300065 100644 --- a/wakaq/worker.py +++ b/wakaq/worker.py @@ -249,19 +249,18 @@ def _child(self, stdout, pingout, broadcastin): task = None if task is not None: + queue = self.wakaq.queues_by_key[queue_broker_key] + current_task.set((task, payload)) + retry = payload.get("retry") or 0 # make sure parent process is still around (OOM killer may have stopped it without sending child signal) try: - self._send_ping_to_parent() + self._send_ping_to_parent(task_name=task.name, queue_name=queue.name if queue else None) except: # give task back to queue so it's not lost self.wakaq.broker.lpush(queue_broker_key, serialize(payload)) raise - queue = self.wakaq.queues_by_key[queue_broker_key] - current_task.set((task, payload)) - retry = payload.get("retry") or 0 - try: self._execute_task(task, payload, queue=queue) @@ -402,7 +401,6 @@ def _enqueue_ready_eta_tasks(self): self.wakaq._enqueue_at_front(task_name, queue.name, args, kwargs) def _execute_task(self, task, payload, queue=None): - self._send_ping_to_parent(task_name=task.name, queue_name=queue.name if queue else None) log.debug(f"running with payload {payload}") if callable(self.wakaq.before_task_started_callback): self.wakaq.before_task_started_callback() @@ -433,6 +431,7 @@ def _execute_broadcast_tasks(self): try: while True: try: + self._send_ping_to_parent(task_name=task.name) self._execute_task(task, payload) break