Skip to content

Commit

Permalink
Added confirm_timeout argument. (#2167)
Browse files Browse the repository at this point in the history
  • Loading branch information
thedrow authored Oct 26, 2024
1 parent 1220144 commit 087527f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
10 changes: 6 additions & 4 deletions kombu/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def publish(self, body, routing_key=None, delivery_mode=None,
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None, timeout=None,
confirm_timeout=None,
**properties):
"""Publish message to the specified exchange.
Expand Down Expand Up @@ -155,6 +156,8 @@ def publish(self, body, routing_key=None, delivery_mode=None,
Default is no expiration.
timeout (float): Set timeout to wait maximum timeout second
for message to publish.
confirm_timeout (float): Set confirm timeout to wait maximum timeout second
for message to confirm publishing if the channel is set to confirm publish mode.
**properties (Any): Additional message properties, see AMQP spec.
"""
_publish = self._publish
Expand Down Expand Up @@ -187,13 +190,12 @@ def publish(self, body, routing_key=None, delivery_mode=None,
return _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare, timeout, retry, retry_policy
exchange_name, declare, timeout, confirm_timeout, retry, retry_policy
)

def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare, timeout=None,
retry=False, retry_policy=None):
immediate, exchange, declare, timeout=None, confirm_timeout=None, retry=False, retry_policy=None):
retry_policy = {} if retry_policy is None else retry_policy
channel = self.channel
message = channel.prepare_message(
Expand All @@ -213,7 +215,7 @@ def _publish(self, body, priority, content_type, content_encoding,
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
timeout=timeout
timeout=timeout, confirm_timeout=confirm_timeout
)

def _get_channel(self):
Expand Down
8 changes: 8 additions & 0 deletions t/unit/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ def test_publish_with_timeout_and_retry_policy(self):
timeout = p._channel.basic_publish.call_args[1]['timeout']
assert timeout == 1

def test_publish_with_confirm_timeout(self):
p = self.connection.Producer()
p.channel = Mock()
p.channel.connection.client.declared_entities = set()
p.publish('test_timeout', exchange=Exchange('foo'), confirm_timeout=1)
confirm_timeout = p._channel.basic_publish.call_args[1]['confirm_timeout']
assert confirm_timeout == 1

@patch('kombu.messaging.maybe_declare')
def test_publish_maybe_declare_with_retry_policy(self, maybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))
Expand Down

0 comments on commit 087527f

Please sign in to comment.