Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Redis connections after reconnect - consumer starts consuming the tasks after crash. #2007

Merged
merged 4 commits into from
Jun 12, 2024

Conversation

awmackowiak
Copy link
Contributor

@awmackowiak awmackowiak commented May 26, 2024

TL;DR
After updating to version 5.2.3, Celery workers stop processing tasks indefinitely after losing connection to the Redis broker.
This issue does not occur in version 5.2.2. Analysis revealed that the problem is related to the removal of new Redis connections from the on_tick method and the behaviour of the __del__ method, which removes these connections before they are properly registered.
The solution is to modify the logic for removing connections from on_tick so that only connections that were properly added to the poller are removed, thereby resolving the reconnection issue.

This cannot be made without @ziollek because he figure out the GC problem and proposed a simple solution.
Big thanks to him :)

Full Description
Problem
After losing connection to the Redis broker, the Celery worker stops consuming tasks indefinitely. This issue didn't occur previously, but it started happening again after version 5.2.3. The discussion is extensive but reveals a few key problems:

So, after a deep dive into the topic, we (with Ziolek) have three main questions to answer:

  1. What was the change in version 5.2.3?
  2. Why does a couple of restarts reveal the problem?
  3. Why doesn’t the problem occur on the first reconnection?

Analysis
First, we used our VaaS environment to simulate Redis instance restarts. After a few restarts, the Celery worker wouldn’t consume tasks from Redis, so we easily reproduced the error. The problem seems to be a race condition because of non-deterministic behaviour. Sometimes the reconnection cannot be handled after one or two restarts, but sometimes (if you restart it quickly) the reconnection problem occurs even after 10 restarts.
Secondly, we compared the differences between versions 5.2.2 and 5.2.3 and tried to find the main change causing the problem:
v5.2.2...v5.2.3#diff-203e309d100714904255d0af4500dd295e40e4ffedfdc9295ab3b386718ca87eR1241-R1247.
One of the features added in this version was logic responsible for removing the connection to Redis from the on_tick method if the file descriptors (fds) are added to the set. This was added to fix a memory leak (because earlier on_tick was only adding items and never removing them), but it started causing the reconnection problem.

You may ask why it broken? The simple answer is: because it removes fresh connections to Redis from the connection pool.

How it Works:
After adding more logs and restarting Redis, we see that Celery tries to reconnect. To do that, it creates a new connection (channel) and checks if it can ping the server. If yes, it is added to the poller and on_tick. If the channel isn’t connected, it raises an exception that the connection cannot be established. On the next attempt, the process looks the same.

Here we start asking a question: what happened to those orphan connections? The answer is: the garbage collector (GC) takes place.

The reason why this was non-deterministic is that the GC runs when it can, so it looks like a race condition.

From this, we can figure out how it really works from the beginning:

  1. The channel is created but cannot establish a connection to Redis, so it becomes an orphan.
  2. The Redis server starts responding.
  3. Celery starts the second attempt, creates a new channel, and checks if it can ping Redis. Now it works, so the channel is added to on_tick and the poll, and the rest of the consumers try to start.
  4. Meanwhile, the GC gets processor time and starts to remove unused objects by calling the del method if defined.
  5. The del method runs reset().
  6. The reset function launches disconnect().
  7. The disconnect function launches _on_disconnect.
  8. The _on_disconnect method removes the newly created, synchronised pub/sub connection to Redis from on_tick.
  9. The server is missing mingle and stops consuming the task.
  10. It hangs because the Celery cycle works properly but without synchronised functions attached to the set.

Solution
We can remove the channel from on_tick only when it was properly added to the poller. To do this, we can start marking the connection that was added to the poll, and in the disconnect function, we only try to remove the connection object that was added to the poller. After adding this logic, the reconnection problem stops occurring.

Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the in depth analysis and a fix. however can you please look into the integration tests, and why they are failing?

@auvipy
Copy link
Member

auvipy commented May 27, 2024

FAILED t/integration/test_py_amqp.py::test_PyAMQPBasicFunctionality::test_connect - ConnectionRefusedError: [Errno 111] Connection refused

might not be related to the fix, right?

@ziollek
Copy link

ziollek commented May 27, 2024

FAILED t/integration/test_py_amqp.py::test_PyAMQPBasicFunctionality::test_connect - ConnectionRefusedError: [Errno 111] Connection refused

might not be related to the fix, right?

As you can see the change affects only redis transport - so I assume it is false-positive.
Take a look - for other PR-s there is the same problem reported - for example https://github.com/celery/kombu/pull/2006/checks

@pawl
Copy link
Contributor

pawl commented May 27, 2024

Nice work! I started to look into a solution here but didn’t figure it out: #1734

@awmackowiak
Copy link
Contributor Author

awmackowiak commented May 27, 2024

@auvipy @Nusnus @pawl
20 days ago the new version of rabbitmq docker has been released.
https://hub.docker.com/_/rabbitmq
I downgrade the image to 3.12.14

diff --git a/tox.ini b/tox.ini
index 243c85d9..4c3e9230 100644
--- a/tox.ini
+++ b/tox.ini
@@ -63,7 +63,7 @@ dockerenv =
     RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit tcp_listeners [5672]

 [docker:rabbitmq]
-image = rabbitmq
+image = rabbitmq:3.12.14
 ports = 5672:5672/tcp
 healthcheck_cmd = /bin/bash -c 'rabbitmq-diagnostics ping -q'
 healthcheck_interval = 10

and the test passed with success

Results (38.83s):
      39 passed
     104 skipped
3.10-linux-integration-py-amqp: exit 0 (40.46 seconds) /Users/artur.mackowiak/Projects/Private/kombu> pytest -xv -E py-amqp t/integration -v pid=61355
3.10-linux-integration-py-amqp: docker> remove 'f895aeaa812b' (from 'rabbitmq')
  3.10-linux-integration-py-amqp: OK (118.33=setup[77.87]+cmd[40.46] seconds)
  congratulations :) (118.55 seconds)

@awmackowiak
Copy link
Contributor Author

Nice work! I started to look into a solution here but didn’t figure it out: #1734

Yes, we have read this PR and it gives us nice insight into fixing the real reason. :)

@auvipy
Copy link
Member

auvipy commented May 27, 2024

@auvipy @Nusnus @pawl 20 days ago the new version of rabbitmq docker has been released. https://hub.docker.com/_/rabbitmq I downgrade the image to 3.12.14

diff --git a/tox.ini b/tox.ini
index 243c85d9..4c3e9230 100644
--- a/tox.ini
+++ b/tox.ini
@@ -63,7 +63,7 @@ dockerenv =
     RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit tcp_listeners [5672]

 [docker:rabbitmq]
-image = rabbitmq
+image = rabbitmq:3.12.14
 ports = 5672:5672/tcp
 healthcheck_cmd = /bin/bash -c 'rabbitmq-diagnostics ping -q'
 healthcheck_interval = 10

and the test passed with success

Results (38.83s):
      39 passed
     104 skipped
3.10-linux-integration-py-amqp: exit 0 (40.46 seconds) /Users/artur.mackowiak/Projects/Private/kombu> pytest -xv -E py-amqp t/integration -v pid=61355
3.10-linux-integration-py-amqp: docker> remove 'f895aeaa812b' (from 'rabbitmq')
  3.10-linux-integration-py-amqp: OK (118.33=setup[77.87]+cmd[40.46] seconds)
  congratulations :) (118.55 seconds)

would you mind checking this PR #2008

@awmackowiak
Copy link
Contributor Author

Due to docker/docker-py#3256 there is a need to hard-code the requests version to <=2.31.0.
5 days ago they released the newest version and the integration tests started failing that day

@@ -2,3 +2,6 @@ typing_extensions; python_version<"3.10"
amqp>=5.1.1,<6.0.0
vine
backports.zoneinfo[tzdata]>=0.2.1; python_version < '3.9'
# due to this bug https://github.com/docker/docker-py/issues/3256
# we need to hard-code the version of requests
requests<2.32.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug is causing such a huge mess, sigh...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to extract whatever handling we do with this requests/docker-py bug to a different PR.
I don’t want it to clutter the PR and specifically not this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.coverage.polpc09554.88892.XPPcvqDx Outdated Show resolved Hide resolved
kombu/transport/redis.py Show resolved Hide resolved
@Nusnus
Copy link
Member

Nusnus commented May 29, 2024

@awmackowiak
Per #2011

  1. Please remove the changes in default.txt.
  2. Please remove the changes in tox.ini
    This will resolve the conflicts with main
  3. Please rebase on main.
  4. Please remove .coverage.polpc09554.88892.XPPcvqDx from the PR.

This should extract any non-redis stuff out of the PR.

@auvipy it seems #2008 is failing due to the requests/docker issue.
CleanShot 2024-05-29 at 21 25 29

I’ve tried to lower it like you did, but it appears to have done nothing, so I reverted the change.

This is why I wanted it all to be separated as this issue caused really confusing stuff.

All in all, once @awmackowiak updates this PR, we can focus on the redis bugfix only and avoid wasting time and effort on non-related issues.

Lastly, assuming this PR does fix the infamous bug (celery/celery#7276), and once it gets fully reviewed & ready for merge, I want to provide an objective method to validate this fix works using pytest-celery, which I will configure to run with Celery main and this PR for Kombu (before merging to main), so if it does work (from Celery, e.g., from the user’s perspective) we can merge, release a new Kombu patch version, and probably (if there isn’t any breaking changes in Celery main) also Celery v5.4.1 with Redis working again.

@awmackowiak
Copy link
Contributor Author

@Nusnus
I have updated the branch. You please, run tests again?

@Nusnus
Copy link
Member

Nusnus commented May 30, 2024

@Nusnus
I have updated the branch. You please, run tests again?

It looks like the CI is fully passing, good job! 🚀

@Nusnus
Copy link
Member

Nusnus commented Jun 3, 2024

@awmackowiak can we consider the current PR “final/ready” from your POV?
I’d like to do some tests before merging so I want to know if there’s something else that needs to be done before I go through with it

Lastly, assuming this PR does fix the infamous bug (celery/celery#7276), and once it gets fully reviewed & ready for merge, I want to provide an objective method to validate this fix works using pytest-celery, which I will configure to run with Celery main and this PR for Kombu (before merging to main), so if it does work (from Celery, e.g., from the user’s perspective) we can merge, release a new Kombu patch version, and probably (if there isn’t any breaking changes in Celery main) also Celery v5.4.1 with Redis working again.

@awmackowiak
Copy link
Contributor Author

@awmackowiak can we consider the current PR “final/ready” from your POV? I’d like to do some tests before merging so I want to know if there’s something else that needs to be done before I go through with it

Lastly, assuming this PR does fix the infamous bug (celery/celery#7276), and once it gets fully reviewed & ready for merge, I want to provide an objective method to validate this fix works using pytest-celery, which I will configure to run with Celery main and this PR for Kombu (before merging to main), so if it does work (from Celery, e.g., from the user’s perspective) we can merge, release a new Kombu patch version, and probably (if there isn’t any breaking changes in Celery main) also Celery v5.4.1 with Redis working again.

From my perspective the PR is completed and ready for tests. 🟢

@Nusnus
Copy link
Member

Nusnus commented Jun 12, 2024

@awmackowiak can we consider the current PR “final/ready” from your POV? I’d like to do some tests before merging so I want to know if there’s something else that needs to be done before I go through with it

Lastly, assuming this PR does fix the infamous bug (celery/celery#7276), and once it gets fully reviewed & ready for merge, I want to provide an objective method to validate this fix works using pytest-celery, which I will configure to run with Celery main and this PR for Kombu (before merging to main), so if it does work (from Celery, e.g., from the user’s perspective) we can merge, release a new Kombu patch version, and probably (if there isn’t any breaking changes in Celery main) also Celery v5.4.1 with Redis working again.

From my perspective the PR is completed and ready for tests. 🟢

TL;DR
It looks like this indeed fixes the bug! @awmackowiak @auvipy 🚀🔥
Unfortunately, it caused me to find a bug in pytest-celery, so I had to test it manually after all🤦‍♂️ - details below!

P.S
This makes my other fix (celery/celery#8796) irrelevant - tagging it in case someone reviews it there to make the link to the actual fix, although we’ll communicate it in the release notes of course!

Manual Tests

myapp.py

Used for the worker & shell

from celery import Celery, shared_task

app = Celery("myapp", broker="redis://")


@shared_task
def identity(x):
    """Return the argument."""
    return x


if __name__ == "__main__":
    app.start()

celery shell

Python 3.12.1 (main, Dec 17 2023, 21:54:54) [Clang 15.0.0 (clang-1500.1.0.2.5)]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.24.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from myapp import identity

In [2]: sig = identity.si("sanity").delay()

In [3]: sig = identity.si("sanity again").delay()

In [4]: sig = identity.si("redis was restared once").delay()

In [5]: sig = identity.si("redis was restared once - trying again").delay()

In [6]: sig = identity.si("redis was restared twice").delay()

In [7]: sig = identity.si("redis was restared twice - trying again").delay()

In [8]: sig = identity.si("redis was restared 3 times").delay()

In [9]: sig = identity.si("redis was restared 3 times - looking good!").delay()

In [10]: sig = identity.si("redis was restared 3 times - @awmackowiak is awesome").delay()

In [11]: sig = identity.si("redis was restared 4 times").delay()

In [12]: sig = identity.si("redis was restared 4 times - are you not entertained??").delay()

celery -A myapp worker -l INFO

celery -A myapp worker -l INFO
[2024-06-12 02:31:10,937: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'

 -------------- celery@Tomers-MacBook-Pro.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- macOS-14.5-arm64-arm-64bit 2024-06-12 02:31:10
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         myapp:0x106cdfa70
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 10 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . myapp.identity

[2024-06-12 02:31:11,074: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:31:11,085: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:31:11,085: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-12 02:31:11,086: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:31:11,089: INFO/MainProcess] mingle: searching for neighbors
[2024-06-12 02:31:11,090: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:31:12,103: INFO/MainProcess] mingle: all alone
[2024-06-12 02:31:12,134: INFO/MainProcess] celery@Tomers-MacBook-Pro.local ready.
[2024-06-12 02:31:19,872: INFO/MainProcess] Task myapp.identity[902da403-d567-48a0-ad03-5eacdab06f92] received
[2024-06-12 02:31:19,873: INFO/ForkPoolWorker-8] Task myapp.identity[902da403-d567-48a0-ad03-5eacdab06f92] succeeded in 0.0002709999680519104s: 'sanity'
[2024-06-12 02:31:35,891: INFO/MainProcess] Task myapp.identity[ec27282f-dd25-481b-a3b1-120566d744e7] received
[2024-06-12 02:31:35,892: INFO/ForkPoolWorker-8] Task myapp.identity[ec27282f-dd25-481b-a3b1-120566d744e7] succeeded in 0.00011008395813405514s: 'sanity again'
[2024-06-12 02:31:44,294: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
    self.cycle.on_readable(fileno)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
    chan.handlers[type]()
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 965, in _brpop_read
    dest__item = self.client.parse_response(self.client.connection,
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 562, in parse_response
    response = connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-12 02:31:44,309: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.

  warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)

[2024-06-12 02:31:44,311: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:31:44,311: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:31:44,312: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)

[2024-06-12 02:31:46,331: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:31:46,331: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-12 02:31:46,332: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:31:46,341: INFO/MainProcess] mingle: searching for neighbors
[2024-06-12 02:31:47,347: INFO/MainProcess] mingle: all alone
[2024-06-12 02:32:05,055: INFO/MainProcess] Task myapp.identity[2d8049fc-dee6-42db-b51d-8c988166a2b1] received
[2024-06-12 02:32:05,055: INFO/ForkPoolWorker-8] Task myapp.identity[2d8049fc-dee6-42db-b51d-8c988166a2b1] succeeded in 4.4209184125065804e-05s: 'redis was restared once'
[2024-06-12 02:32:22,508: INFO/MainProcess] Task myapp.identity[5f8c626a-3566-4d31-8c69-f4c53b08cfd6] received
[2024-06-12 02:32:22,509: INFO/ForkPoolWorker-8] Task myapp.identity[5f8c626a-3566-4d31-8c69-f4c53b08cfd6] succeeded in 4.833308048546314e-05s: 'redis was restared once - trying again'
[2024-06-12 02:32:30,270: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
    self.cycle.on_readable(fileno)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
    chan.handlers[type]()
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 916, in _receive
    ret.append(self._receive_one(c))
               ^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 926, in _receive_one
    response = c.parse_response()
               ^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 837, in parse_response
    response = self._execute(conn, try_read)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 813, in _execute
    return conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 49, in call_with_retry
    fail(error)
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 815, in <lambda>
    lambda error: self._disconnect_raise_connect(conn, error),
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 802, in _disconnect_raise_connect
    raise error
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
           ^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 814, in <lambda>
    lambda: command(*args, **kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 835, in try_read
    return conn.read_response(disconnect_on_error=False, push_request=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-12 02:32:30,279: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.

  warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)

[2024-06-12 02:32:30,282: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:32:30,283: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:32:30,283: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)

[2024-06-12 02:32:32,293: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:32:32,293: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 4.00 seconds... (2/100)

[2024-06-12 02:32:36,321: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:32:36,322: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-12 02:32:36,323: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:32:36,338: INFO/MainProcess] mingle: searching for neighbors
[2024-06-12 02:32:37,364: INFO/MainProcess] mingle: all alone
[2024-06-12 02:32:46,024: INFO/MainProcess] Task myapp.identity[f10b8a77-31fe-493d-83ae-aaafeab465c7] received
[2024-06-12 02:32:46,025: INFO/ForkPoolWorker-8] Task myapp.identity[f10b8a77-31fe-493d-83ae-aaafeab465c7] succeeded in 8.345884270966053e-05s: 'redis was restared twice'
[2024-06-12 02:32:56,776: INFO/MainProcess] Task myapp.identity[adb54663-acba-4ad3-b3ed-68e30593a756] received
[2024-06-12 02:32:56,776: INFO/ForkPoolWorker-8] Task myapp.identity[adb54663-acba-4ad3-b3ed-68e30593a756] succeeded in 8.983397856354713e-05s: 'redis was restared twice - trying again'
[2024-06-12 02:33:05,115: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
    self.cycle.on_readable(fileno)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
    chan.handlers[type]()
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 965, in _brpop_read
    dest__item = self.client.parse_response(self.client.connection,
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 562, in parse_response
    response = connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-12 02:33:05,118: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.

  warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)

[2024-06-12 02:33:05,121: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:33:05,124: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:05,124: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)

[2024-06-12 02:33:07,131: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:07,131: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 4.00 seconds... (2/100)

[2024-06-12 02:33:11,145: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:11,146: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 6.00 seconds... (3/100)

[2024-06-12 02:33:17,171: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:17,171: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 8.00 seconds... (4/100)

[2024-06-12 02:33:25,199: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:25,200: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 10.00 seconds... (5/100)

[2024-06-12 02:33:35,248: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:33:35,249: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-12 02:33:35,249: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:33:35,254: INFO/MainProcess] mingle: searching for neighbors
[2024-06-12 02:33:36,265: INFO/MainProcess] mingle: all alone
[2024-06-12 02:33:45,560: INFO/MainProcess] Task myapp.identity[101a6b3a-3124-4618-a302-28498e8989fa] received
[2024-06-12 02:33:45,561: INFO/ForkPoolWorker-8] Task myapp.identity[101a6b3a-3124-4618-a302-28498e8989fa] succeeded in 4.6625034883618355e-05s: 'redis was restared 3 times'
[2024-06-12 02:34:16,291: INFO/MainProcess] Task myapp.identity[fc8c6ec6-cbcb-4e5a-b99b-b56794c9da2e] received
[2024-06-12 02:34:16,291: INFO/ForkPoolWorker-8] Task myapp.identity[fc8c6ec6-cbcb-4e5a-b99b-b56794c9da2e] succeeded in 5.1040900871157646e-05s: 'redis was restared 3 times - looking good!'
[2024-06-12 02:34:38,051: INFO/MainProcess] Task myapp.identity[7f82ed55-9eca-4e69-963f-d784c48bc404] received
[2024-06-12 02:34:38,052: INFO/ForkPoolWorker-8] Task myapp.identity[7f82ed55-9eca-4e69-963f-d784c48bc404] succeeded in 8.870800957083702e-05s: 'redis was restared 3 times - @awmackowiak is awesome'
[2024-06-12 02:34:50,542: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
    self.cycle.on_readable(fileno)
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
    chan.handlers[type]()
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 916, in _receive
    ret.append(self._receive_one(c))
               ^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 926, in _receive_one
    response = c.parse_response()
               ^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 837, in parse_response
    response = self._execute(conn, try_read)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 813, in _execute
    return conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 49, in call_with_retry
    fail(error)
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 815, in <lambda>
    lambda error: self._disconnect_raise_connect(conn, error),
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 802, in _disconnect_raise_connect
    raise error
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
           ^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 814, in <lambda>
    lambda: command(*args, **kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 835, in try_read
    return conn.read_response(disconnect_on_error=False, push_request=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-12 02:34:50,546: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.

  warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)

[2024-06-12 02:34:50,550: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:34:50,556: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:34:50,557: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)

[2024-06-12 02:34:52,575: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-12 02:34:52,576: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-12 02:34:52,576: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-06-12 02:34:52,581: INFO/MainProcess] mingle: searching for neighbors
[2024-06-12 02:34:53,595: INFO/MainProcess] mingle: all alone
[2024-06-12 02:35:13,928: INFO/MainProcess] Task myapp.identity[3f4484e3-ba98-4fb2-9f2a-7f17d50fd92e] received
[2024-06-12 02:35:13,929: INFO/ForkPoolWorker-8] Task myapp.identity[3f4484e3-ba98-4fb2-9f2a-7f17d50fd92e] succeeded in 9.645894169807434e-05s: 'redis was restared 4 times'
[2024-06-12 02:36:32,718: INFO/MainProcess] Task myapp.identity[9916015f-f3da-460e-876f-38ad7afcdddb] received
[2024-06-12 02:36:32,719: INFO/ForkPoolWorker-8] Task myapp.identity[9916015f-f3da-460e-876f-38ad7afcdddb] succeeded in 9.929109364748001e-05s: 'redis was restared 4 times - are you not entertained??'
^C
worker: Hitting Ctrl+C again will terminate all running tasks!

worker: Warm shutdown (MainProcess)

Kombu setup

Installed using pip install -e ../kombu/

pip list | grep kombu
kombu                                   5.3.7       /Users/nusnus/dev/GitHub/kombu
git branch | cat
  main
* pr/2007
git show | cat
commit 2c2bb319ea954c95ca81ce3e5ff28f994520309d
Author: Artur Maćkowiak <awmackowiak@gmail.com>
Date:   Wed May 22 06:56:46 2024 +0200

    Change the comment

diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index cba56ee1..9311ecf5 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -1204,7 +1204,7 @@ class Channel(virtual.Channel):
             class Connection(connection_cls):
                 def disconnect(self, *args):
                     super().disconnect(*args)
-                    # We only remove the connection from the poller
+                    # We remove the connection from the poller
                     # only if it has been added properly.
                     if channel._registered:
                         channel._on_connection_disconnect(self)

Automatic Tests

I’ll share the WIP I did so far, at least for the sake of documentation.
This test should pass, but obviously, it only confirms a single restart, which is not enough.
This is due to a bug where restarting the redis broker container gives it a new port whereas kombu still has a connection to the previous container port and does not publish tasks after the second restart.
I am too tired and too busy to handle this at the moment, but here’s what I got.

Celery Worker Dockerfile

FROM python:3.11-slim-buster

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential \
    git \
    wget \
    make \
    curl \
    apt-utils \
    debconf \
    lsb-release \
    libmemcached-dev \
    libffi-dev \
    ca-certificates \
    pypy3 \
    pypy3-lib \
    sudo

# Set arguments
ARG CELERY_VERSION=""
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=celery_test_worker
ARG CELERY_WORKER_QUEUE=celery
ENV WORKER_VERSION=$CELERY_VERSION
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

EXPOSE 5678

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade \
    pip \
    celery[redis]${WORKER_VERSION:+==$WORKER_VERSION} \
    pytest-celery>=1.0.0

# Install kombu from the PR: https://github.com/celery/kombu/pull/2007
WORKDIR /tmp

RUN git clone https://github.com/celery/kombu.git && \
    cd kombu && \
    git fetch origin pull/2007/head:pr-2007 && \
    git checkout pr-2007 && \
    pip install -e .

# The workdir must be /app
WORKDIR /app

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE

test_redis_fix.py

from __future__ import annotations

import pytest
from pytest_celery import (DEFAULT_WORKER_CONTAINER_TIMEOUT, DEFAULT_WORKER_VOLUME, CeleryBackendCluster,
                           CeleryBrokerCluster, CeleryTestSetup, CeleryWorkerContainer, RedisTestBackend,
                           RedisTestBroker, ping)
from pytest_docker_tools import build, container, fxtr

from celery.canvas import Signature
from celery.result import AsyncResult

###############################################################################
# Redis Broker
###############################################################################


@pytest.fixture
def celery_broker_cluster(celery_redis_broker: RedisTestBroker) -> CeleryBrokerCluster:
    cluster = CeleryBrokerCluster(celery_redis_broker)
    yield cluster
    cluster.teardown()


@pytest.fixture
def default_redis_broker_image() -> str:
    return "redis:latest"


###############################################################################
# Redis Result Backend
###############################################################################


@pytest.fixture
def celery_backend_cluster(celery_redis_backend: RedisTestBackend) -> CeleryBackendCluster:
    cluster = CeleryBackendCluster(celery_redis_backend)
    yield cluster
    cluster.teardown()


@pytest.fixture
def default_redis_backend_image() -> str:
    return "redis:latest"


###############################################################################
# Worker Configuration
###############################################################################


class WorkerContainer(CeleryWorkerContainer):
    @classmethod
    def log_level(cls) -> str:
        return "INFO"

    @classmethod
    def version(cls) -> str:
        return "5.4.0"

    # @classmethod
    # def initial_env(
    #     cls, celery_worker_cluster_config: dict, initial: dict | None = None
    # ) -> dict:
    #     initial_env = super().initial_env(
    #         celery_worker_cluster_config,
    #         initial,
    #     )
    #     # This redis instance needs to be on the same network as the worker
    #     initial_env['CELERY_BROKER_URL'] = "redis://localhost:6379/0"
    #     return initial_env


@pytest.fixture
def default_worker_container_cls() -> type[CeleryWorkerContainer]:
    return WorkerContainer


@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
    return WorkerContainer


my_celery_image = build(
    path=".",
    # dockerfile="t/unit/tomer/Dockerfile",
    dockerfile="Dockerfile",
    tag="my_celery_image:kombu_issue_2007",
    buildargs=WorkerContainer.buildargs(),
)

default_worker_container = container(
    image="{my_celery_image.id}",
    ports=fxtr("default_worker_ports"),
    environment=fxtr("default_worker_env"),
    network="{default_pytest_celery_network.name}",
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    wrapper_class=WorkerContainer,
    timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=fxtr("default_worker_command"),
)


###############################################################################
# Bug Reproduction
###############################################################################


def test_kombu_issue_2007(celery_setup: CeleryTestSetup):
    # Sanity
    sig: Signature = ping.s()
    res: AsyncResult = sig.apply_async()
    assert res.get(timeout=10) == "pong"

    celery_setup.broker.restart()
    sig: Signature = ping.s()
    res: AsyncResult = sig.apply_async()
    assert res.get(timeout=10) == "pong"

    # celery_setup.broker.restart()
    # sig: Signature = ping.s()
    # # Bug with pytest-celery, kombu tries to publish to incorrect redis port
    # res: AsyncResult = sig.apply_async()
    # assert res.get(timeout=10) == "pong"

How to run

  1. pip install -U pytest-celery
  2. pytest -xsv test_redis_fix.py
pytest -xsv test_redis_fix.py
======================================================================================= test session starts =======================================================================================
platform darwin -- Python 3.12.1, pytest-8.2.0, pluggy-1.5.0 -- /Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/bin/python3.12
cachedir: .pytest_cache
rootdir: /Users/nusnus/dev/GitHub/celery
configfile: pyproject.toml
plugins: docker-tools-3.1.3, celery-1.0.0, cov-5.0.0, github-actions-annotate-failures-0.2.0, click-1.1.0, anyio-4.3.0, subtests-0.12.1, rerunfailures-14.0, order-1.2.1, timeout-2.3.1
collected 1 item

test_redis_fix.py::test_kombu_issue_2007[celery_setup_worker] Creating network pytest-7f768578-5b03-4beb-80fe-a9175f117dfe
Waiting for container to be ready.RedisContainer::vigorous_shaw is ready.

Waiting for container to be ready.RedisContainer::nervous_joliot is ready.

Creating volume pytest-2c958c61-0291-4ea6-9616-ed85c88e00a3
Building ...................................................................................
Waiting for container to be ready.Waiting for WorkerContainer::happy_raman to get ready........
WorkerContainer::happy_raman is ready.

Waiting for container to be ready after restart.RedisContainer::vigorous_shaw is ready.

PASSED

======================================================================================= 1 passed in 11.88s ========================================================================================

P.S
I was too lazy to create a new project for this script, so I added a folder in t/unit called tomer and placed both files there to allow me to debug it with vscode using my celery development setup.

@ziollek
Copy link

ziollek commented Jun 12, 2024

In [10]: sig = identity.si("redis was restared 3 times - @awmackowiak is awesome").delay()

❤️

@Nusnus
Copy link
Member

Nusnus commented Jun 22, 2024

Fix released in v5.4.0rc1 - let’s see what feedback we get 🙏

@awmackowiak
Copy link
Contributor Author

I'm delighted to see this release and I'm also curious about feedback from people :)

@MauritsMonteyne
Copy link

@awmackowiak Just did some testing and it definitely fixes my connection issues with Redis. Thanks a lot for this PR! 🙏

@Nusnus
Copy link
Member

Nusnus commented Jun 25, 2024

@awmackowiak Just did some testing and it definitely fixes my connection issues with Redis. Thanks a lot for this PR! 🙏

I’ll also share there’s been more feedback reporting the issue was successfully fixed in different issues on this topic.
Well done!! @awmackowiak 👏

@bdoublet91
Copy link

Hi,
Thanks for your job on this issue.
I have the same problem here but with rabbitmq mentionned in discussion celery/celery#7276 like some people.
Could it be the same problem ?

@MauritsMonteyne
Copy link

@bdoublet91 I don't have any RMQ projects to test this with, but I think this might very well be the case

@bdoublet91
Copy link

Ok, because I reported the problem with rabbitmq 3.11 too and celery 5.3 in production.
Should I start a new discussion about Rabbitmq ?

@Nusnus
Copy link
Member

Nusnus commented Jun 25, 2024

Should I start a new discussion about Rabbitmq ?

Yes, please.

@bdoublet91
Copy link

Ok thanks let's start here celery/celery#9095

@awmackowiak
Copy link
Contributor Author

Ok, because I reported the problem with rabbitmq 3.11 too and celery 5.3 in production. Should I start a new discussion about Rabbitmq ?

My change didn't change anything connected with rabbitmq/pyamqp.
It only fixes the connection issues with Redis.
But if you provide the steps/configuration with that problem in my free time I could check and maybe find out what's is going on in that case also.

@bdoublet91
Copy link

Yeah I understood that only for redis.
what steps/configuration do you need ? The behavior is the same as described here.

@Nusnus
Copy link
Member

Nusnus commented Jul 24, 2024

Celery v5.5.0b1 was released including this fix via Kombu release-candidate pre-release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants