diff --git a/kombu/common.py b/kombu/common.py index c00ae0fed..29fc209ff 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -126,7 +126,7 @@ def _ensure_channel_is_bound(entity, channel): raise ChannelError( f"Cannot bind channel {channel} to entity {entity}") entity = entity.bind(channel) - return entity + return entity def _maybe_declare(entity, channel): @@ -159,7 +159,7 @@ def _maybe_declare(entity, channel): def _imaybe_declare(entity, channel, **retry_policy): - _ensure_channel_is_bound(entity, channel) + entity = _ensure_channel_is_bound(entity, channel) if not entity.channel.connection: raise RecoverableConnectionError('channel disconnected') diff --git a/kombu/messaging.py b/kombu/messaging.py index 99cf9e2f5..ffd631905 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -187,12 +187,14 @@ def publish(self, body, routing_key=None, delivery_mode=None, return _publish( body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, - exchange_name, declare, timeout + exchange_name, declare, timeout, retry, retry_policy ) def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, - immediate, exchange, declare, timeout=None): + immediate, exchange, declare, timeout=None, + retry=False, retry_policy=None): + retry_policy = {} if retry_policy is None else retry_policy channel = self.channel message = channel.prepare_message( body, priority, content_type, @@ -200,7 +202,8 @@ def _publish(self, body, priority, content_type, content_encoding, ) if declare: maybe_declare = self.maybe_declare - [maybe_declare(entity) for entity in declare] + for entity in declare: + maybe_declare(entity, retry=retry, **retry_policy) # handle autogenerated queue names for reply_to reply_to = properties.get('reply_to') diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py index 61ccaaf34..9c8e22d69 100644 --- a/t/unit/test_messaging.py +++ b/t/unit/test_messaging.py @@ -3,7 +3,7 @@ import pickle import sys from collections import defaultdict -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch import pytest @@ -170,6 +170,34 @@ def test_publish_with_timeout_and_retry_policy(self): timeout = p._channel.basic_publish.call_args[1]['timeout'] assert timeout == 1 + @patch('kombu.messaging.maybe_declare') + def test_publish_maybe_declare_with_retry_policy(self, maybe_declare): + p = self.connection.Producer(exchange=Exchange('foo')) + p.channel = Mock() + expected_retry_policy = { + "max_retries": 20, + "interval_start": 1, + "interval_step": 2, + "interval_max": 30, + "retry_errors": (OperationalError,) + } + p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy) + maybe_declare.assert_called_once_with(ANY, ANY, True, **expected_retry_policy) + + @patch('kombu.common._imaybe_declare') + def test_publish_maybe_declare_with_retry_policy_ensure_connection(self, _imaybe_declare): + p = self.connection.Producer(exchange=Exchange('foo')) + p.channel = Mock() + expected_retry_policy = { + "max_retries": 20, + "interval_start": 1, + "interval_step": 2, + "interval_max": 30, + "retry_errors": (OperationalError,) + } + p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy) + _imaybe_declare.assert_called_once_with(ANY, ANY, **expected_retry_policy) + def test_publish_with_reply_to(self): p = self.connection.Producer() p.channel = Mock() @@ -200,7 +228,7 @@ def test_publish_retry_with_declare(self): p.connection.ensure = Mock() ex = Exchange('foo') p._publish('hello', 0, '', '', {}, {}, 'rk', 0, 0, ex, declare=[ex]) - p.maybe_declare.assert_called_with(ex) + p.maybe_declare.assert_called_with(ex, retry=False) def test_revive_when_channel_is_connection(self): p = self.connection.Producer()