Skip to content

Commit

Permalink
Prevent lost tasks when OOM kills parent worker process bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhamlett committed Nov 12, 2023
1 parent bb4f26e commit 677dbcb
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions wakaq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,18 @@ def _child(self, stdout, pingout, broadcastin):
task = None

if task is not None:
queue = self.wakaq.queues_by_key[queue_broker_key]
current_task.set((task, payload))
retry = payload.get("retry") or 0

# make sure parent process is still around (OOM killer may have stopped it without sending child signal)
try:
self._send_ping_to_parent()
self._send_ping_to_parent(task_name=task.name, queue_name=queue.name if queue else None)
except:
# give task back to queue so it's not lost
self.wakaq.broker.lpush(queue_broker_key, serialize(payload))
raise

queue = self.wakaq.queues_by_key[queue_broker_key]
current_task.set((task, payload))
retry = payload.get("retry") or 0

try:
self._execute_task(task, payload, queue=queue)

Expand Down Expand Up @@ -402,7 +401,6 @@ def _enqueue_ready_eta_tasks(self):
self.wakaq._enqueue_at_front(task_name, queue.name, args, kwargs)

def _execute_task(self, task, payload, queue=None):
self._send_ping_to_parent(task_name=task.name, queue_name=queue.name if queue else None)
log.debug(f"running with payload {payload}")
if callable(self.wakaq.before_task_started_callback):
self.wakaq.before_task_started_callback()
Expand Down Expand Up @@ -433,6 +431,7 @@ def _execute_broadcast_tasks(self):
try:
while True:
try:
self._send_ping_to_parent(task_name=task.name)
self._execute_task(task, payload)
break

Expand Down

0 comments on commit 677dbcb

Please sign in to comment.