From 2f5882331237147ccd34fef646e7b38c338430bb Mon Sep 17 00:00:00 2001 From: Haim Daniel <64732931+haimjether@users.noreply.github.com> Date: Sun, 13 Oct 2024 16:49:55 +0300 Subject: [PATCH 1/2] Add support for Google Pub/Sub as transport broker (#2147) * Add support for Google Pub/Sub as transport broker * Add tests * Add docs * flake8 * flake8 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix future import * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add missing test requirements * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add monitoring dependency * Fix test for python3.8 * Mock better google's monitoring api * Flake8 * Add refdoc * Add extra url to workaround pypy grpcio * Add extra index url in tox for grpcio/pypy support * Revert "Add extra url to workaround pypy grpcio" This reverts commit dfde4d523c7487990d84d5c4b4b5c7f4a8d17294. * pin grpcio version to match extra_index wheel * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Reduce poll calls if qos denies msg rx --------- Co-authored-by: Haim Daniel Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Tomer Nosrati --- docs/reference/kombu.transport.gcpubsub.rst | 24 + kombu/transport/__init__.py | 3 +- kombu/transport/gcpubsub.py | 810 ++++++++++++++++++++ requirements/extras/gcpubsub.txt | 3 + requirements/test-ci.txt | 1 + setup.py | 1 + t/unit/transport/test_gcpubsub.py | 793 +++++++++++++++++++ tox.ini | 4 +- 8 files changed, 1637 insertions(+), 2 deletions(-) create mode 100644 docs/reference/kombu.transport.gcpubsub.rst create mode 100644 kombu/transport/gcpubsub.py create mode 100644 requirements/extras/gcpubsub.txt create mode 100644 t/unit/transport/test_gcpubsub.py diff --git a/docs/reference/kombu.transport.gcpubsub.rst b/docs/reference/kombu.transport.gcpubsub.rst new file mode 100644 index 000000000..bf5fc058b --- /dev/null +++ b/docs/reference/kombu.transport.gcpubsub.rst @@ -0,0 +1,24 @@ +============================================================== + Google Cloud Pub/Sub Transport - ``kombu.transport.gcpubsub`` +============================================================== + +.. currentmodule:: kombu.transport.gcpubsub + +.. automodule:: kombu.transport.gcpubsub + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 64dd35c66..180a27b4b 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -43,7 +43,8 @@ def supports_librabbitmq() -> bool | None: 'etcd': 'kombu.transport.etcd:Transport', 'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport', 'azureservicebus': 'kombu.transport.azureservicebus:Transport', - 'pyro': 'kombu.transport.pyro:Transport' + 'pyro': 'kombu.transport.pyro:Transport', + 'gcpubsub': 'kombu.transport.gcpubsub:Transport', } _transport_cache = {} diff --git a/kombu/transport/gcpubsub.py b/kombu/transport/gcpubsub.py new file mode 100644 index 000000000..368c7a380 --- /dev/null +++ b/kombu/transport/gcpubsub.py @@ -0,0 +1,810 @@ +"""GCP Pub/Sub transport module for kombu. + +More information about GCP Pub/Sub: +https://cloud.google.com/pubsub + +Features +======== +* Type: Virtual +* Supports Direct: Yes +* Supports Topic: No +* Supports Fanout: Yes +* Supports Priority: No +* Supports TTL: No + +Connection String +================= + +Connection string has the following formats: + +.. code-block:: + + gcpubsub://projects/project-name + +Transport Options +================= +* ``queue_name_prefix``: (str) Prefix for queue names. +* ``ack_deadline_seconds``: (int) The maximum time after receiving a message + and acknowledging it before pub/sub redelivers the message. +* ``expiration_seconds``: (int) Subscriptions without any subscriber + activity or changes made to their properties are removed after this period. + Examples of subscriber activities include open connections, + active pulls, or successful pushes. +* ``wait_time_seconds``: (int) The maximum time to wait for new messages. + Defaults to 10. +* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying. +* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk. + Defaults to 32. +""" + +from __future__ import annotations + +import dataclasses +import datetime +import string +import threading +from concurrent.futures import (FIRST_COMPLETED, Future, ThreadPoolExecutor, + wait) +from contextlib import suppress +from os import getpid +from queue import Empty +from threading import Lock +from time import monotonic, sleep +from uuid import NAMESPACE_OID, uuid3 + +from _socket import gethostname +from _socket import timeout as socket_timeout +from google.api_core.exceptions import (AlreadyExists, DeadlineExceeded, + PermissionDenied) +from google.api_core.retry import Retry +from google.cloud import monitoring_v3 +from google.cloud.monitoring_v3 import query +from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient +from google.cloud.pubsub_v1 import exceptions as pubsub_exceptions +from google.cloud.pubsub_v1.publisher import exceptions as publisher_exceptions +from google.cloud.pubsub_v1.subscriber import \ + exceptions as subscriber_exceptions +from google.pubsub_v1 import gapic_version as package_version + +from kombu.entity import TRANSIENT_DELIVERY_MODE +from kombu.log import get_logger +from kombu.utils.encoding import bytes_to_str, safe_str +from kombu.utils.json import dumps, loads +from kombu.utils.objects import cached_property + +from . import virtual + +logger = get_logger('kombu.transport.gcpubsub') + +# dots are replaced by dash, all other punctuation replaced by underscore. +PUNCTUATIONS_TO_REPLACE = set(string.punctuation) - {'_', '.', '-'} +CHARS_REPLACE_TABLE = { + ord('.'): ord('-'), + **{ord(c): ord('_') for c in PUNCTUATIONS_TO_REPLACE}, +} + + +class UnackedIds: + """Threadsafe list of ack_ids.""" + + def __init__(self): + self._list = [] + self._lock = Lock() + + def append(self, val): + # append is atomic + self._list.append(val) + + def extend(self, vals: list): + # extend is atomic + self._list.extend(vals) + + def pop(self, index=-1): + with self._lock: + return self._list.pop(index) + + def remove(self, val): + with self._lock, suppress(ValueError): + self._list.remove(val) + + def __len__(self): + with self._lock: + return len(self._list) + + def __getitem__(self, item): + # getitem is atomic + return self._list[item] + + +class AtomicCounter: + """Threadsafe counter. + + Returns the value after inc/dec operations. + """ + + def __init__(self, initial=0): + self._value = initial + self._lock = Lock() + + def inc(self, n=1): + with self._lock: + self._value += n + return self._value + + def dec(self, n=1): + with self._lock: + self._value -= n + return self._value + + def get(self): + with self._lock: + return self._value + + +@dataclasses.dataclass +class QueueDescriptor: + """Pub/Sub queue descriptor.""" + + name: str + topic_path: str # projects/{project_id}/topics/{topic_id} + subscription_id: str + subscription_path: str # projects/{project_id}/subscriptions/{subscription_id} + unacked_ids: UnackedIds = dataclasses.field(default_factory=UnackedIds) + + +class Channel(virtual.Channel): + """GCP Pub/Sub channel.""" + + supports_fanout = True + do_restore = False # pub/sub does that for us + default_wait_time_seconds = 10 + default_ack_deadline_seconds = 240 + default_expiration_seconds = 86400 + default_retry_timeout_seconds = 300 + default_bulk_max_messages = 32 + + _min_ack_deadline = 10 + _fanout_exchanges = set() + _unacked_extender: threading.Thread = None + _stop_extender = threading.Event() + _n_channels = AtomicCounter() + _queue_cache: dict[str, QueueDescriptor] = {} + _tmp_subscriptions: set[str] = set() + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pool = ThreadPoolExecutor() + logger.info('new GCP pub/sub channel: %s', self.conninfo.hostname) + + self.project_id = Transport.parse_uri(self.conninfo.hostname) + if self._n_channels.inc() == 1: + Channel._unacked_extender = threading.Thread( + target=self._extend_unacked_deadline, + daemon=True, + ) + self._stop_extender.clear() + Channel._unacked_extender.start() + + def entity_name(self, name: str, table=CHARS_REPLACE_TABLE) -> str: + """Format AMQP queue name into a valid Pub/Sub queue name.""" + if not name.startswith(self.queue_name_prefix): + name = self.queue_name_prefix + name + + return str(safe_str(name)).translate(table) + + def _queue_bind(self, exchange, routing_key, pattern, queue): + exchange_type = self.typeof(exchange).type + queue = self.entity_name(queue) + logger.debug( + 'binding queue: %s to %s exchange: %s with routing_key: %s', + queue, + exchange_type, + exchange, + routing_key, + ) + + filter_args = {} + if exchange_type == 'direct': + # Direct exchange is implemented as a single subscription + # E.g. for exchange 'test_direct': + # -topic:'test_direct' + # -bound queue:'direct1': + # -subscription: direct1' on topic 'test_direct' + # -filter:routing_key' + filter_args = { + 'filter': f'attributes.routing_key="{routing_key}"' + } + subscription_path = self.subscriber.subscription_path( + self.project_id, queue + ) + message_retention_duration = self.expiration_seconds + elif exchange_type == 'fanout': + # Fanout exchange is implemented as a separate subscription. + # E.g. for exchange 'test_fanout': + # -topic:'test_fanout' + # -bound queue 'fanout1': + # -subscription:'fanout1-uuid' on topic 'test_fanout' + # -bound queue 'fanout2': + # -subscription:'fanout2-uuid' on topic 'test_fanout' + uid = f'{uuid3(NAMESPACE_OID, f"{gethostname()}.{getpid()}")}' + uniq_sub_name = f'{queue}-{uid}' + subscription_path = self.subscriber.subscription_path( + self.project_id, uniq_sub_name + ) + self._tmp_subscriptions.add(subscription_path) + self._fanout_exchanges.add(exchange) + message_retention_duration = 600 + else: + raise NotImplementedError( + f'exchange type {exchange_type} not implemented' + ) + exchange_topic = self._create_topic( + self.project_id, exchange, message_retention_duration + ) + self._create_subscription( + topic_path=exchange_topic, + subscription_path=subscription_path, + filter_args=filter_args, + msg_retention=message_retention_duration, + ) + qdesc = QueueDescriptor( + name=queue, + topic_path=exchange_topic, + subscription_id=queue, + subscription_path=subscription_path, + ) + self._queue_cache[queue] = qdesc + + def _create_topic( + self, + project_id: str, + topic_id: str, + message_retention_duration: int = None, + ) -> str: + topic_path = self.publisher.topic_path(project_id, topic_id) + if self._is_topic_exists(topic_path): + # topic creation takes a while, so skip if possible + logger.debug('topic: %s exists', topic_path) + return topic_path + try: + logger.debug('creating topic: %s', topic_path) + request = {'name': topic_path} + if message_retention_duration: + request[ + 'message_retention_duration' + ] = f'{message_retention_duration}s' + self.publisher.create_topic(request=request) + except AlreadyExists: + pass + + return topic_path + + def _is_topic_exists(self, topic_path: str) -> bool: + topics = self.publisher.list_topics( + request={"project": f'projects/{self.project_id}'} + ) + for t in topics: + if t.name == topic_path: + return True + return False + + def _create_subscription( + self, + project_id: str = None, + topic_id: str = None, + topic_path: str = None, + subscription_path: str = None, + filter_args=None, + msg_retention: int = None, + ) -> str: + subscription_path = ( + subscription_path + or self.subscriber.subscription_path(self.project_id, topic_id) + ) + topic_path = topic_path or self.publisher.topic_path( + project_id, topic_id + ) + try: + logger.debug( + 'creating subscription: %s, topic: %s, filter: %s', + subscription_path, + topic_path, + filter_args, + ) + msg_retention = msg_retention or self.expiration_seconds + self.subscriber.create_subscription( + request={ + "name": subscription_path, + "topic": topic_path, + 'ack_deadline_seconds': self.ack_deadline_seconds, + 'expiration_policy': { + 'ttl': f'{self.expiration_seconds}s' + }, + 'message_retention_duration': f'{msg_retention}s', + **(filter_args or {}), + } + ) + except AlreadyExists: + pass + return subscription_path + + def _delete(self, queue, *args, **kwargs): + """Delete a queue by name.""" + queue = self.entity_name(queue) + logger.info('deleting queue: %s', queue) + qdesc = self._queue_cache.get(queue) + if not qdesc: + return + self.subscriber.delete_subscription( + request={"subscription": qdesc.subscription_path} + ) + self._queue_cache.pop(queue, None) + + def _put(self, queue, message, **kwargs): + """Put a message onto the queue.""" + queue = self.entity_name(queue) + qdesc = self._queue_cache[queue] + routing_key = self._get_routing_key(message) + logger.debug( + 'putting message to queue: %s, topic: %s, routing_key: %s', + queue, + qdesc.topic_path, + routing_key, + ) + encoded_message = dumps(message) + self.publisher.publish( + qdesc.topic_path, + encoded_message.encode("utf-8"), + routing_key=routing_key, + ) + + def _put_fanout(self, exchange, message, routing_key, **kwargs): + """Put a message onto fanout exchange.""" + self._lookup(exchange, routing_key) + topic_path = self.publisher.topic_path(self.project_id, exchange) + logger.debug( + 'putting msg to fanout exchange: %s, topic: %s', + exchange, + topic_path, + ) + encoded_message = dumps(message) + self.publisher.publish( + topic_path, + encoded_message.encode("utf-8"), + retry=Retry(deadline=self.retry_timeout_seconds), + ) + + def _get(self, queue: str, timeout: float = None): + """Retrieves a single message from a queue.""" + queue = self.entity_name(queue) + qdesc = self._queue_cache[queue] + try: + response = self.subscriber.pull( + request={ + 'subscription': qdesc.subscription_path, + 'max_messages': 1, + }, + retry=Retry(deadline=self.retry_timeout_seconds), + timeout=timeout or self.wait_time_seconds, + ) + except DeadlineExceeded: + raise Empty() + + if len(response.received_messages) == 0: + raise Empty() + + message = response.received_messages[0] + ack_id = message.ack_id + payload = loads(message.message.data) + delivery_info = payload['properties']['delivery_info'] + logger.debug( + 'queue:%s got message, ack_id: %s, payload: %s', + queue, + ack_id, + payload['properties'], + ) + if self._is_auto_ack(payload['properties']): + logger.debug('auto acking message ack_id: %s', ack_id) + self._do_ack([ack_id], qdesc.subscription_path) + else: + delivery_info['gcpubsub_message'] = { + 'queue': queue, + 'ack_id': ack_id, + 'message_id': message.message.message_id, + 'subscription_path': qdesc.subscription_path, + } + qdesc.unacked_ids.append(ack_id) + + return payload + + def _is_auto_ack(self, payload_properties: dict): + exchange = payload_properties['delivery_info']['exchange'] + delivery_mode = payload_properties['delivery_mode'] + return ( + delivery_mode == TRANSIENT_DELIVERY_MODE + or exchange in self._fanout_exchanges + ) + + def _get_bulk(self, queue: str, timeout: float): + """Retrieves bulk of messages from a queue.""" + prefixed_queue = self.entity_name(queue) + qdesc = self._queue_cache[prefixed_queue] + max_messages = self._get_max_messages_estimate() + if not max_messages: + raise Empty() + try: + response = self.subscriber.pull( + request={ + 'subscription': qdesc.subscription_path, + 'max_messages': max_messages, + }, + retry=Retry(deadline=self.retry_timeout_seconds), + timeout=timeout or self.wait_time_seconds, + ) + except DeadlineExceeded: + raise Empty() + + received_messages = response.received_messages + if len(received_messages) == 0: + raise Empty() + + auto_ack_ids = [] + ret_payloads = [] + logger.debug( + 'batching %d messages from queue: %s', + len(received_messages), + prefixed_queue, + ) + for message in received_messages: + ack_id = message.ack_id + payload = loads(bytes_to_str(message.message.data)) + delivery_info = payload['properties']['delivery_info'] + delivery_info['gcpubsub_message'] = { + 'queue': prefixed_queue, + 'ack_id': ack_id, + 'message_id': message.message.message_id, + 'subscription_path': qdesc.subscription_path, + } + if self._is_auto_ack(payload['properties']): + auto_ack_ids.append(ack_id) + else: + qdesc.unacked_ids.append(ack_id) + ret_payloads.append(payload) + if auto_ack_ids: + logger.debug('auto acking ack_ids: %s', auto_ack_ids) + self._do_ack(auto_ack_ids, qdesc.subscription_path) + + return queue, ret_payloads + + def _get_max_messages_estimate(self) -> int: + max_allowed = self.qos.can_consume_max_estimate() + max_if_unlimited = self.bulk_max_messages + return max_if_unlimited if max_allowed is None else max_allowed + + def _lookup(self, exchange, routing_key, default=None): + exchange_info = self.state.exchanges.get(exchange, {}) + if not exchange_info: + return super()._lookup(exchange, routing_key, default) + ret = self.typeof(exchange).lookup( + self.get_table(exchange), + exchange, + routing_key, + default, + ) + if ret: + return ret + logger.debug( + 'no queues bound to exchange: %s, binding on the fly', + exchange, + ) + self.queue_bind(exchange, exchange, routing_key) + return [exchange] + + def _size(self, queue: str) -> int: + """Return the number of messages in a queue. + + This is a *rough* estimation, as Pub/Sub doesn't provide + an exact API. + """ + queue = self.entity_name(queue) + if queue not in self._queue_cache: + return 0 + qdesc = self._queue_cache[queue] + result = query.Query( + self.monitor, + self.project_id, + 'pubsub.googleapis.com/subscription/num_undelivered_messages', + end_time=datetime.datetime.now(), + minutes=1, + ).select_resources(subscription_id=qdesc.subscription_id) + + # monitoring API requires the caller to have the monitoring.viewer + # role. Since we can live without the exact number of messages + # in the queue, we can ignore the exception and allow users to + # use the transport without this role. + with suppress(PermissionDenied): + return sum( + content.points[0].value.int64_value for content in result + ) + return -1 + + def basic_ack(self, delivery_tag, multiple=False): + """Acknowledge one message.""" + if multiple: + raise NotImplementedError('multiple acks not implemented') + + delivery_info = self.qos.get(delivery_tag).delivery_info + pubsub_message = delivery_info['gcpubsub_message'] + ack_id = pubsub_message['ack_id'] + queue = pubsub_message['queue'] + logger.debug('ack message. queue: %s ack_id: %s', queue, ack_id) + subscription_path = pubsub_message['subscription_path'] + self._do_ack([ack_id], subscription_path) + qdesc = self._queue_cache[queue] + qdesc.unacked_ids.remove(ack_id) + super().basic_ack(delivery_tag) + + def _do_ack(self, ack_ids: list[str], subscription_path: str): + self.subscriber.acknowledge( + request={"subscription": subscription_path, "ack_ids": ack_ids}, + retry=Retry(deadline=self.retry_timeout_seconds), + ) + + def _purge(self, queue: str): + """Delete all current messages in a queue.""" + queue = self.entity_name(queue) + qdesc = self._queue_cache.get(queue) + if not qdesc: + return + + n = self._size(queue) + self.subscriber.seek( + request={ + "subscription": qdesc.subscription_path, + "time": datetime.datetime.now(), + } + ) + return n + + def _extend_unacked_deadline(self): + thread_id = threading.get_native_id() + logger.info( + 'unacked deadline extension thread: [%s] started', + thread_id, + ) + min_deadline_sleep = self._min_ack_deadline / 2 + sleep_time = max(min_deadline_sleep, self.ack_deadline_seconds / 4) + while not self._stop_extender.wait(sleep_time): + for qdesc in self._queue_cache.values(): + if len(qdesc.unacked_ids) == 0: + logger.debug( + 'thread [%s]: no unacked messages for %s', + thread_id, + qdesc.subscription_path, + ) + continue + logger.debug( + 'thread [%s]: extend ack deadline for %s: %d msgs [%s]', + thread_id, + qdesc.subscription_path, + len(qdesc.unacked_ids), + list(qdesc.unacked_ids), + ) + self.subscriber.modify_ack_deadline( + request={ + "subscription": qdesc.subscription_path, + "ack_ids": list(qdesc.unacked_ids), + "ack_deadline_seconds": self.ack_deadline_seconds, + } + ) + logger.info( + 'unacked deadline extension thread [%s] stopped', thread_id + ) + + def after_reply_message_received(self, queue: str): + queue = self.entity_name(queue) + sub = self.subscriber.subscription_path(self.project_id, queue) + logger.debug( + 'after_reply_message_received: queue: %s, sub: %s', queue, sub + ) + self._tmp_subscriptions.add(sub) + + @cached_property + def subscriber(self): + return SubscriberClient() + + @cached_property + def publisher(self): + return PublisherClient() + + @cached_property + def monitor(self): + return monitoring_v3.MetricServiceClient() + + @property + def conninfo(self): + return self.connection.client + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def wait_time_seconds(self): + return self.transport_options.get( + 'wait_time_seconds', self.default_wait_time_seconds + ) + + @cached_property + def retry_timeout_seconds(self): + return self.transport_options.get( + 'retry_timeout_seconds', self.default_retry_timeout_seconds + ) + + @cached_property + def ack_deadline_seconds(self): + return self.transport_options.get( + 'ack_deadline_seconds', self.default_ack_deadline_seconds + ) + + @cached_property + def queue_name_prefix(self): + return self.transport_options.get('queue_name_prefix', 'kombu-') + + @cached_property + def expiration_seconds(self): + return self.transport_options.get( + 'expiration_seconds', self.default_expiration_seconds + ) + + @cached_property + def bulk_max_messages(self): + return self.transport_options.get( + 'bulk_max_messages', self.default_bulk_max_messages + ) + + def close(self): + """Close the channel.""" + logger.debug('closing channel') + while self._tmp_subscriptions: + sub = self._tmp_subscriptions.pop() + with suppress(Exception): + logger.debug('deleting subscription: %s', sub) + self.subscriber.delete_subscription( + request={"subscription": sub} + ) + if not self._n_channels.dec(): + self._stop_extender.set() + Channel._unacked_extender.join() + super().close() + + @staticmethod + def _get_routing_key(message): + routing_key = ( + message['properties'] + .get('delivery_info', {}) + .get('routing_key', '') + ) + return routing_key + + +class Transport(virtual.Transport): + """GCP Pub/Sub transport.""" + + Channel = Channel + + can_parse_url = True + polling_interval = 0.1 + connection_errors = virtual.Transport.connection_errors + ( + pubsub_exceptions.TimeoutError, + ) + channel_errors = ( + virtual.Transport.channel_errors + + ( + publisher_exceptions.FlowControlLimitError, + publisher_exceptions.MessageTooLargeError, + publisher_exceptions.PublishError, + publisher_exceptions.TimeoutError, + publisher_exceptions.PublishToPausedOrderingKeyException, + ) + + (subscriber_exceptions.AcknowledgeError,) + ) + + driver_type = 'gcpubsub' + driver_name = 'pubsub_v1' + + implements = virtual.Transport.implements.extend( + exchange_type=frozenset(['direct', 'fanout']), + ) + + def __init__(self, client, **kwargs): + super().__init__(client, **kwargs) + self._pool = ThreadPoolExecutor() + self._get_bulk_future_to_queue: dict[Future, str] = dict() + + def driver_version(self): + return package_version.__version__ + + @staticmethod + def parse_uri(uri: str) -> str: + # URL like: + # gcpubsub://projects/project-name + + project = uri.split('gcpubsub://projects/')[1] + return project.strip('/') + + @classmethod + def as_uri(self, uri: str, include_password=False, mask='**') -> str: + return uri or 'gcpubsub://' + + def drain_events(self, connection, timeout=None): + time_start = monotonic() + polling_interval = self.polling_interval + if timeout and polling_interval and polling_interval > timeout: + polling_interval = timeout + while 1: + try: + self._drain_from_active_queues(timeout=timeout) + except Empty: + if timeout and monotonic() - time_start >= timeout: + raise socket_timeout() + if polling_interval: + sleep(polling_interval) + else: + break + + def _drain_from_active_queues(self, timeout): + # cleanup empty requests from prev run + self._rm_empty_bulk_requests() + + # submit new requests for all active queues + # longer timeout means less frequent polling + # and more messages in a single bulk + self._submit_get_bulk_requests(timeout=10) + + done, _ = wait( + self._get_bulk_future_to_queue, + timeout=timeout, + return_when=FIRST_COMPLETED, + ) + empty = {f for f in done if f.exception()} + done -= empty + for f in empty: + self._get_bulk_future_to_queue.pop(f, None) + + if not done: + raise Empty() + + logger.debug('got %d done get_bulk tasks', len(done)) + for f in done: + queue, payloads = f.result() + for payload in payloads: + logger.debug('consuming message from queue: %s', queue) + if queue not in self._callbacks: + logger.warning( + 'Message for queue %s without consumers', queue + ) + continue + self._deliver(payload, queue) + self._get_bulk_future_to_queue.pop(f, None) + + def _rm_empty_bulk_requests(self): + empty = { + f + for f in self._get_bulk_future_to_queue + if f.done() and f.exception() + } + for f in empty: + self._get_bulk_future_to_queue.pop(f, None) + + def _submit_get_bulk_requests(self, timeout): + queues_with_submitted_get_bulk = set( + self._get_bulk_future_to_queue.values() + ) + + for channel in self.channels: + for queue in channel._active_queues: + if queue in queues_with_submitted_get_bulk: + continue + future = self._pool.submit(channel._get_bulk, queue, timeout) + self._get_bulk_future_to_queue[future] = queue diff --git a/requirements/extras/gcpubsub.txt b/requirements/extras/gcpubsub.txt new file mode 100644 index 000000000..6dfee275f --- /dev/null +++ b/requirements/extras/gcpubsub.txt @@ -0,0 +1,3 @@ +google-cloud-pubsub>=2.18.4 +google-cloud-monitoring>=2.16.0 +grpcio==1.66.2 diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index f9ffaf950..8882410ed 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -16,3 +16,4 @@ urllib3>=1.26.16; sys_platform != 'win32' -r extras/zstd.txt -r extras/sqlalchemy.txt -r extras/etcd.txt +-r extras/gcpubsub.txt diff --git a/setup.py b/setup.py index da1ac7303..f4cb25799 100644 --- a/setup.py +++ b/setup.py @@ -103,6 +103,7 @@ def readme(): 'redis': extras('redis.txt'), 'mongodb': extras('mongodb.txt'), 'sqs': extras('sqs.txt'), + 'gcpubsub': extras('gcpubsub.txt'), 'zookeeper': extras('zookeeper.txt'), 'sqlalchemy': extras('sqlalchemy.txt'), 'librabbitmq': extras('librabbitmq.txt'), diff --git a/t/unit/transport/test_gcpubsub.py b/t/unit/transport/test_gcpubsub.py new file mode 100644 index 000000000..5e60336b6 --- /dev/null +++ b/t/unit/transport/test_gcpubsub.py @@ -0,0 +1,793 @@ +from __future__ import annotations + +from concurrent.futures import Future +from datetime import datetime +from queue import Empty +from unittest.mock import MagicMock, call, patch + +import pytest +from _socket import timeout as socket_timeout +from google.api_core.exceptions import (AlreadyExists, DeadlineExceeded, + PermissionDenied) + +from kombu.transport.gcpubsub import (AtomicCounter, Channel, QueueDescriptor, + Transport, UnackedIds) + + +class test_UnackedIds: + def setup_method(self): + self.unacked_ids = UnackedIds() + + def test_append(self): + self.unacked_ids.append('test_id') + assert self.unacked_ids[0] == 'test_id' + + def test_extend(self): + self.unacked_ids.extend(['test_id1', 'test_id2']) + assert self.unacked_ids[0] == 'test_id1' + assert self.unacked_ids[1] == 'test_id2' + + def test_pop(self): + self.unacked_ids.append('test_id') + popped_id = self.unacked_ids.pop() + assert popped_id == 'test_id' + assert len(self.unacked_ids) == 0 + + def test_remove(self): + self.unacked_ids.append('test_id') + self.unacked_ids.remove('test_id') + assert len(self.unacked_ids) == 0 + + def test_len(self): + self.unacked_ids.append('test_id') + assert len(self.unacked_ids) == 1 + + def test_getitem(self): + self.unacked_ids.append('test_id') + assert self.unacked_ids[0] == 'test_id' + + +class test_AtomicCounter: + def setup_method(self): + self.counter = AtomicCounter() + + def test_inc(self): + assert self.counter.inc() == 1 + assert self.counter.inc(5) == 6 + + def test_dec(self): + self.counter.inc(5) + assert self.counter.dec() == 4 + assert self.counter.dec(2) == 2 + + def test_get(self): + self.counter.inc(7) + assert self.counter.get() == 7 + + +@pytest.fixture +def channel(): + with patch.object(Channel, '__init__', lambda self: None): + channel = Channel() + channel.connection = MagicMock() + channel.queue_name_prefix = "kombu-" + channel.project_id = "test_project" + channel._queue_cache = {} + channel._n_channels = MagicMock() + channel._stop_extender = MagicMock() + channel.subscriber = MagicMock() + channel.publisher = MagicMock() + channel.closed = False + with patch.object( + Channel, 'conninfo', new_callable=MagicMock + ), patch.object( + Channel, 'transport_options', new_callable=MagicMock + ), patch.object( + Channel, 'qos', new_callable=MagicMock + ): + yield channel + + +class test_Channel: + @patch('kombu.transport.gcpubsub.ThreadPoolExecutor') + @patch('kombu.transport.gcpubsub.threading.Event') + @patch('kombu.transport.gcpubsub.threading.Thread') + @patch( + 'kombu.transport.gcpubsub.Channel._get_free_channel_id', + return_value=1, + ) + @patch( + 'kombu.transport.gcpubsub.Channel._n_channels.inc', + return_value=1, + ) + def test_channel_init( + self, + n_channels_in_mock, + channel_id_mock, + mock_thread, + mock_event, + mock_executor, + ): + mock_connection = MagicMock() + ch = Channel(mock_connection) + ch._n_channels.inc.assert_called_once() + mock_thread.assert_called_once_with( + target=ch._extend_unacked_deadline, + daemon=True, + ) + mock_thread.return_value.start.assert_called_once() + + def test_entity_name(self, channel): + name = "test_queue" + result = channel.entity_name(name) + assert result == "kombu-test_queue" + + @patch('kombu.transport.gcpubsub.uuid3', return_value='uuid') + @patch('kombu.transport.gcpubsub.gethostname', return_value='hostname') + @patch('kombu.transport.gcpubsub.getpid', return_value=1234) + def test_queue_bind_direct( + self, mock_pid, mock_hostname, mock_uuid, channel + ): + exchange = 'direct' + routing_key = 'test_routing_key' + pattern = 'test_pattern' + queue = 'test_queue' + subscription_path = 'projects/project-id/subscriptions/test_queue' + channel.subscriber.subscription_path = MagicMock( + return_value=subscription_path + ) + channel._create_topic = MagicMock(return_value='topic_path') + channel._create_subscription = MagicMock() + + # Mock the state and exchange type + mock_connection = MagicMock(name='mock_connection') + channel.connection = mock_connection + channel.state.exchanges = {exchange: {'type': 'direct'}} + mock_exchange = MagicMock(name='mock_exchange', type='direct') + channel.exchange_types = {'direct': mock_exchange} + + channel._queue_bind(exchange, routing_key, pattern, queue) + + channel._create_topic.assert_called_once_with( + channel.project_id, exchange, channel.expiration_seconds + ) + channel._create_subscription.assert_called_once_with( + topic_path='topic_path', + subscription_path=subscription_path, + filter_args={'filter': f'attributes.routing_key="{routing_key}"'}, + msg_retention=channel.expiration_seconds, + ) + assert channel.entity_name(queue) in channel._queue_cache + + @patch('kombu.transport.gcpubsub.uuid3', return_value='uuid') + @patch('kombu.transport.gcpubsub.gethostname', return_value='hostname') + @patch('kombu.transport.gcpubsub.getpid', return_value=1234) + def test_queue_bind_fanout( + self, mock_pid, mock_hostname, mock_uuid, channel + ): + exchange = 'test_exchange' + routing_key = 'test_routing_key' + pattern = 'test_pattern' + queue = 'test_queue' + uniq_sub_name = 'test_queue-uuid' + subscription_path = ( + f'projects/project-id/subscriptions/{uniq_sub_name}' + ) + channel.subscriber.subscription_path = MagicMock( + return_value=subscription_path + ) + channel._create_topic = MagicMock(return_value='topic_path') + channel._create_subscription = MagicMock() + + # Mock the state and exchange type + mock_connection = MagicMock(name='mock_connection') + channel.connection = mock_connection + channel.state.exchanges = {exchange: {'type': 'fanout'}} + mock_exchange = MagicMock(name='mock_exchange', type='fanout') + channel.exchange_types = {'fanout': mock_exchange} + + channel._queue_bind(exchange, routing_key, pattern, queue) + + channel._create_topic.assert_called_once_with( + channel.project_id, exchange, 600 + ) + channel._create_subscription.assert_called_once_with( + topic_path='topic_path', + subscription_path=subscription_path, + filter_args={}, + msg_retention=600, + ) + assert channel.entity_name(queue) in channel._queue_cache + assert subscription_path in channel._tmp_subscriptions + assert exchange in channel._fanout_exchanges + + def test_queue_bind_not_implemented(self, channel): + exchange = 'test_exchange' + routing_key = 'test_routing_key' + pattern = 'test_pattern' + queue = 'test_queue' + channel.typeof = MagicMock(return_value=MagicMock(type='unsupported')) + + with pytest.raises(NotImplementedError): + channel._queue_bind(exchange, routing_key, pattern, queue) + + def test_create_topic(self, channel): + channel.project_id = "project_id" + topic_id = "topic_id" + channel._is_topic_exists = MagicMock(return_value=False) + channel.publisher.topic_path = MagicMock(return_value="topic_path") + channel.publisher.create_topic = MagicMock() + result = channel._create_topic(channel.project_id, topic_id) + assert result == "topic_path" + channel.publisher.create_topic.assert_called_once() + + channel._create_topic( + channel.project_id, topic_id, message_retention_duration=10 + ) + assert ( + dict( + request={ + 'name': 'topic_path', + 'message_retention_duration': '10s', + } + ) + in channel.publisher.create_topic.call_args + ) + channel.publisher.create_topic.side_effect = AlreadyExists( + "test_error" + ) + channel._create_topic( + channel.project_id, topic_id, message_retention_duration=10 + ) + + def test_is_topic_exists(self, channel): + topic_path = "projects/project-id/topics/test_topic" + mock_topic = MagicMock() + mock_topic.name = topic_path + channel.publisher.list_topics.return_value = [mock_topic] + + result = channel._is_topic_exists(topic_path) + + assert result is True + channel.publisher.list_topics.assert_called_once_with( + request={"project": f'projects/{channel.project_id}'} + ) + + def test_is_topic_not_exists(self, channel): + topic_path = "projects/project-id/topics/test_topic" + channel.publisher.list_topics.return_value = [] + + result = channel._is_topic_exists(topic_path) + + assert result is False + channel.publisher.list_topics.assert_called_once_with( + request={"project": f'projects/{channel.project_id}'} + ) + + def test_create_subscription(self, channel): + channel.project_id = "project_id" + topic_id = "topic_id" + subscription_path = "subscription_path" + topic_path = "topic_path" + channel.subscriber.subscription_path = MagicMock( + return_value=subscription_path + ) + channel.publisher.topic_path = MagicMock(return_value=topic_path) + channel.subscriber.create_subscription = MagicMock() + result = channel._create_subscription( + project_id=channel.project_id, + topic_id=topic_id, + subscription_path=subscription_path, + topic_path=topic_path, + ) + assert result == subscription_path + channel.subscriber.create_subscription.assert_called_once() + + def test_delete(self, channel): + queue = "test_queue" + subscription_path = "projects/project-id/subscriptions/test_queue" + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id=queue, + subscription_path=subscription_path, + ) + channel.subscriber = MagicMock() + channel._queue_cache[channel.entity_name(queue)] = qdesc + + channel._delete(queue) + + channel.subscriber.delete_subscription.assert_called_once_with( + request={"subscription": subscription_path} + ) + assert queue not in channel._queue_cache + + def test_put(self, channel): + queue = "test_queue" + message = { + "properties": {"delivery_info": {"routing_key": "test_key"}} + } + channel.entity_name = MagicMock(return_value=queue) + channel._queue_cache[channel.entity_name(queue)] = QueueDescriptor( + name=queue, + topic_path="topic_path", + subscription_id=queue, + subscription_path="subscription_path", + ) + channel._get_routing_key = MagicMock(return_value="test_key") + channel.publisher.publish = MagicMock() + channel._put(queue, message) + channel.publisher.publish.assert_called_once() + + def test_put_fanout(self, channel): + exchange = "test_exchange" + message = { + "properties": {"delivery_info": {"routing_key": "test_key"}} + } + routing_key = "test_key" + + channel._lookup = MagicMock() + channel.publisher.topic_path = MagicMock(return_value="topic_path") + channel.publisher.publish = MagicMock() + + channel._put_fanout(exchange, message, routing_key) + + channel._lookup.assert_called_once_with(exchange, routing_key) + channel.publisher.topic_path.assert_called_once_with( + channel.project_id, exchange + ) + assert 'topic_path', ( + b'{"properties": {"delivery_info": {"routing_key": "test_key"}}}' + in channel.publisher.publish.call_args + ) + + def test_get(self, channel): + queue = "test_queue" + channel.entity_name = MagicMock(return_value=queue) + channel._queue_cache[queue] = QueueDescriptor( + name=queue, + topic_path="topic_path", + subscription_id=queue, + subscription_path="subscription_path", + ) + channel.subscriber.pull = MagicMock( + return_value=MagicMock( + received_messages=[ + MagicMock( + ack_id="ack_id", + message=MagicMock( + data=b'{"properties": ' + b'{"delivery_info": ' + b'{"exchange": "exchange"},"delivery_mode": 1}}' + ), + ) + ] + ) + ) + channel.subscriber.acknowledge = MagicMock() + payload = channel._get(queue) + assert ( + payload["properties"]["delivery_info"]["exchange"] == "exchange" + ) + channel.subscriber.pull.side_effect = DeadlineExceeded("test_error") + with pytest.raises(Empty): + channel._get(queue) + + def test_get_bulk(self, channel): + queue = "test_queue" + subscription_path = "projects/project-id/subscriptions/test_queue" + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id=queue, + subscription_path=subscription_path, + ) + channel._queue_cache[channel.entity_name(queue)] = qdesc + + data = b'{"properties": {"delivery_info": {"exchange": "exchange"}}}' + received_message = MagicMock( + ack_id="ack_id", + message=MagicMock(data=data), + ) + channel.subscriber.pull = MagicMock( + return_value=MagicMock(received_messages=[received_message]) + ) + channel.bulk_max_messages = 10 + channel._is_auto_ack = MagicMock(return_value=True) + channel._do_ack = MagicMock() + channel.qos.can_consume_max_estimate = MagicMock(return_value=None) + queue, payloads = channel._get_bulk(queue, timeout=10) + + assert len(payloads) == 1 + assert ( + payloads[0]["properties"]["delivery_info"]["exchange"] + == "exchange" + ) + channel._do_ack.assert_called_once_with(["ack_id"], subscription_path) + + channel.subscriber.pull.side_effect = DeadlineExceeded("test_error") + with pytest.raises(Empty): + channel._get_bulk(queue, timeout=10) + + def test_lookup(self, channel): + exchange = "test_exchange" + routing_key = "test_key" + default = None + + channel.connection = MagicMock() + channel.state.exchanges = {exchange: {"type": "direct"}} + channel.typeof = MagicMock( + return_value=MagicMock(lookup=MagicMock(return_value=["queue1"])) + ) + channel.get_table = MagicMock(return_value="table") + + result = channel._lookup(exchange, routing_key, default) + + channel.typeof.return_value.lookup.assert_called_once_with( + "table", exchange, routing_key, default + ) + assert result == ["queue1"] + + # Test the case where no queues are bound to the exchange + channel.typeof.return_value.lookup.return_value = None + channel.queue_bind = MagicMock() + + result = channel._lookup(exchange, routing_key, default) + + channel.queue_bind.assert_called_once_with( + exchange, exchange, routing_key + ) + assert result == [exchange] + + @patch('kombu.transport.gcpubsub.monitoring_v3') + @patch('kombu.transport.gcpubsub.query.Query') + def test_size(self, mock_query, mock_monitor, channel): + queue = "test_queue" + subscription_id = "test_subscription" + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id=subscription_id, + subscription_path="projects/project-id/subscriptions/test_subscription", # E501 + ) + channel._queue_cache[channel.entity_name(queue)] = qdesc + + mock_query_result = MagicMock() + mock_query_result.select_resources.return_value = [ + MagicMock(points=[MagicMock(value=MagicMock(int64_value=5))]) + ] + mock_query.return_value = mock_query_result + size = channel._size(queue) + assert size == 5 + + # Test the case where the queue is not in the cache + size = channel._size("non_existent_queue") + assert size == 0 + + # Test the case where the query raises PermissionDenied + mock_item = MagicMock() + mock_item.points.__getitem__.side_effect = PermissionDenied( + 'test_error' + ) + mock_query_result.select_resources.return_value = [mock_item] + size = channel._size(queue) + assert size == -1 + + def test_basic_ack(self, channel): + delivery_tag = "test_delivery_tag" + ack_id = "test_ack_id" + queue = "test_queue" + subscription_path = ( + "projects/project-id/subscriptions/test_subscription" + ) + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id="test_subscription", + subscription_path=subscription_path, + ) + channel._queue_cache[queue] = qdesc + + delivery_info = { + 'gcpubsub_message': { + 'queue': queue, + 'ack_id': ack_id, + 'subscription_path': subscription_path, + } + } + channel.qos.get = MagicMock( + return_value=MagicMock(delivery_info=delivery_info) + ) + channel._do_ack = MagicMock() + + channel.basic_ack(delivery_tag) + + channel._do_ack.assert_called_once_with([ack_id], subscription_path) + assert ack_id not in qdesc.unacked_ids + + def test_do_ack(self, channel): + ack_ids = ["ack_id1", "ack_id2"] + subscription_path = ( + "projects/project-id/subscriptions/test_subscription" + ) + channel.subscriber = MagicMock() + + channel._do_ack(ack_ids, subscription_path) + assert subscription_path, ( + ack_ids in channel.subscriber.acknowledge.call_args + ) + + def test_purge(self, channel): + queue = "test_queue" + subscription_path = f"projects/project-id/subscriptions/{queue}" + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id="test_subscription", + subscription_path=subscription_path, + ) + channel._queue_cache[channel.entity_name(queue)] = qdesc + channel.subscriber = MagicMock() + + with patch.object(channel, '_size', return_value=10), patch( + 'kombu.transport.gcpubsub.datetime.datetime' + ) as dt_mock: + dt_mock.now.return_value = datetime(2021, 1, 1) + result = channel._purge(queue) + assert result == 10 + channel.subscriber.seek.assert_called_once_with( + request={ + "subscription": subscription_path, + "time": datetime(2021, 1, 1), + } + ) + + # Test the case where the queue is not in the cache + result = channel._purge("non_existent_queue") + assert result is None + + def test_extend_unacked_deadline(self, channel): + queue = "test_queue" + subscription_path = ( + "projects/project-id/subscriptions/test_subscription" + ) + ack_ids = ["ack_id1", "ack_id2"] + qdesc = QueueDescriptor( + name=queue, + topic_path="projects/project-id/topics/test_topic", + subscription_id="test_subscription", + subscription_path=subscription_path, + ) + channel.transport_options = {"ack_deadline_seconds": 240} + channel._queue_cache[channel.entity_name(queue)] = qdesc + qdesc.unacked_ids.extend(ack_ids) + + channel._stop_extender.wait = MagicMock(side_effect=[False, True]) + channel.subscriber.modify_ack_deadline = MagicMock() + + channel._extend_unacked_deadline() + + channel.subscriber.modify_ack_deadline.assert_called_once_with( + request={ + "subscription": subscription_path, + "ack_ids": ack_ids, + "ack_deadline_seconds": 240, + } + ) + for _ in ack_ids: + qdesc.unacked_ids.pop() + channel._stop_extender.wait = MagicMock(side_effect=[False, True]) + modify_ack_deadline_calls = ( + channel.subscriber.modify_ack_deadline.call_count + ) + channel._extend_unacked_deadline() + assert ( + channel.subscriber.modify_ack_deadline.call_count + == modify_ack_deadline_calls + ) + + def test_after_reply_message_received(self, channel): + queue = 'test-queue' + subscription_path = f'projects/test-project/subscriptions/{queue}' + + channel.subscriber.subscription_path.return_value = subscription_path + channel.after_reply_message_received(queue) + + # Check that the subscription path is added to _tmp_subscriptions + assert subscription_path in channel._tmp_subscriptions + + def test_subscriber(self, channel): + assert channel.subscriber + + def test_publisher(self, channel): + assert channel.publisher + + def test_transport_options(self, channel): + assert channel.transport_options + + def test_bulk_max_messages_default(self, channel): + assert channel.bulk_max_messages == channel.transport_options.get( + 'bulk_max_messages' + ) + + def test_close(self, channel): + channel._tmp_subscriptions = {'sub1', 'sub2'} + channel._n_channels.dec.return_value = 0 + + with patch.object( + Channel._unacked_extender, 'join' + ) as mock_join, patch( + 'kombu.transport.virtual.Channel.close' + ) as mock_super_close: + channel.close() + + channel.subscriber.delete_subscription.assert_has_calls( + [ + call(request={"subscription": 'sub1'}), + call(request={"subscription": 'sub2'}), + ], + any_order=True, + ) + channel._stop_extender.set.assert_called_once() + mock_join.assert_called_once() + mock_super_close.assert_called_once() + + +@pytest.fixture +def transport(): + return Transport(client=MagicMock()) + + +class test_Transport: + def test_driver_version(self, transport): + assert transport.driver_version() + + def test_as_uri(self, transport): + result = transport.as_uri('gcpubsub://') + assert result == 'gcpubsub://' + + def test_drain_events_timeout(self, transport): + transport.polling_interval = 4 + with patch.object( + transport, '_drain_from_active_queues', side_effect=Empty + ), patch( + 'kombu.transport.gcpubsub.monotonic', + side_effect=[0, 1, 2, 3, 4, 5], + ), patch( + 'kombu.transport.gcpubsub.sleep' + ) as mock_sleep: + with pytest.raises(socket_timeout): + transport.drain_events(None, timeout=3) + mock_sleep.assert_called() + + def test_drain_events_no_timeout(self, transport): + with patch.object( + transport, '_drain_from_active_queues', side_effect=[Empty, None] + ), patch( + 'kombu.transport.gcpubsub.monotonic', side_effect=[0, 1] + ), patch( + 'kombu.transport.gcpubsub.sleep' + ) as mock_sleep: + transport.drain_events(None, timeout=None) + mock_sleep.assert_called() + + def test_drain_events_polling_interval(self, transport): + transport.polling_interval = 2 + with patch.object( + transport, '_drain_from_active_queues', side_effect=[Empty, None] + ), patch( + 'kombu.transport.gcpubsub.monotonic', side_effect=[0, 1, 2] + ), patch( + 'kombu.transport.gcpubsub.sleep' + ) as mock_sleep: + transport.drain_events(None, timeout=5) + mock_sleep.assert_called_with(2) + + def test_drain_from_active_queues_empty(self, transport): + with patch.object( + transport, '_rm_empty_bulk_requests' + ) as mock_rm_empty, patch.object( + transport, '_submit_get_bulk_requests' + ) as mock_submit, patch( + 'kombu.transport.gcpubsub.wait', return_value=(set(), set()) + ) as mock_wait: + with pytest.raises(Empty): + transport._drain_from_active_queues(timeout=10) + mock_rm_empty.assert_called_once() + mock_submit.assert_called_once_with(timeout=10) + mock_wait.assert_called_once() + + def test_drain_from_active_queues_done(self, transport): + future = Future() + future.set_result(('queue', [{'properties': {'delivery_info': {}}}])) + + with patch.object( + transport, '_rm_empty_bulk_requests' + ) as mock_rm_empty, patch.object( + transport, '_submit_get_bulk_requests' + ) as mock_submit, patch( + 'kombu.transport.gcpubsub.wait', return_value=({future}, set()) + ) as mock_wait, patch.object( + transport, '_deliver' + ) as mock_deliver: + transport._callbacks = {'queue'} + transport._drain_from_active_queues(timeout=10) + mock_rm_empty.assert_called_once() + mock_submit.assert_called_once_with(timeout=10) + mock_wait.assert_called_once() + mock_deliver.assert_called_once_with( + {'properties': {'delivery_info': {}}}, 'queue' + ) + + mock_deliver_call_count = mock_deliver.call_count + transport._callbacks = {} + transport._drain_from_active_queues(timeout=10) + assert mock_deliver_call_count == mock_deliver.call_count + + def test_drain_from_active_queues_exception(self, transport): + future = Future() + future.set_exception(Exception("Test exception")) + + with patch.object( + transport, '_rm_empty_bulk_requests' + ) as mock_rm_empty, patch.object( + transport, '_submit_get_bulk_requests' + ) as mock_submit, patch( + 'kombu.transport.gcpubsub.wait', return_value=({future}, set()) + ) as mock_wait: + with pytest.raises(Empty): + transport._drain_from_active_queues(timeout=10) + mock_rm_empty.assert_called_once() + mock_submit.assert_called_once_with(timeout=10) + mock_wait.assert_called_once() + + def test_rm_empty_bulk_requests(self, transport): + # Create futures with exceptions to simulate empty requests + future_with_exception = Future() + future_with_exception.set_exception(Exception("Test exception")) + + transport._get_bulk_future_to_queue = { + future_with_exception: 'queue1', + } + + transport._rm_empty_bulk_requests() + + # Assert that the future with exception is removed + assert ( + future_with_exception not in transport._get_bulk_future_to_queue + ) + + def test_submit_get_bulk_requests(self, transport): + channel_mock = MagicMock(spec=Channel) + channel_mock._active_queues = ['queue1', 'queue2'] + transport.channels = [channel_mock] + + with patch.object( + transport._pool, 'submit', return_value=MagicMock() + ) as mock_submit: + transport._submit_get_bulk_requests(timeout=10) + + # Check that submit was called twice, once for each queue + assert mock_submit.call_count == 2 + mock_submit.assert_any_call(channel_mock._get_bulk, 'queue1', 10) + mock_submit.assert_any_call(channel_mock._get_bulk, 'queue2', 10) + + def test_submit_get_bulk_requests_with_existing_futures(self, transport): + channel_mock = MagicMock(spec=Channel) + channel_mock._active_queues = ['queue1', 'queue2'] + transport.channels = [channel_mock] + + # Simulate existing futures + future_mock = MagicMock() + transport._get_bulk_future_to_queue = {future_mock: 'queue1'} + + with patch.object( + transport._pool, 'submit', return_value=MagicMock() + ) as mock_submit: + transport._submit_get_bulk_requests(timeout=10) + + # Check that submit was called only once for the new queue + assert mock_submit.call_count == 1 + mock_submit.assert_called_with( + channel_mock._get_bulk, 'queue2', 10 + ) diff --git a/tox.ini b/tox.ini index b1259e0ec..7b45b9b49 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,9 @@ python = [testenv] sitepackages = False -setenv = C_DEBUG_TEST = 1 +setenv = + C_DEBUG_TEST = 1 + PIP_EXTRA_INDEX_URL=https://celery.github.io/celery-wheelhouse/repo/simple/ passenv = DISTUTILS_USE_SDK deps= From 3f5d19973ecc42f4b84aa0a2945d95c73b65f54b Mon Sep 17 00:00:00 2001 From: Omer Katz Date: Sun, 13 Oct 2024 20:11:22 +0300 Subject: [PATCH 2/2] Update the transport options according to the retry policy (#2148) * Update the transport options according to the retry policy. * Add unit test. --- kombu/messaging.py | 1 + t/unit/test_messaging.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/kombu/messaging.py b/kombu/messaging.py index 588e8a267..99cf9e2f5 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -182,6 +182,7 @@ def publish(self, body, routing_key=None, delivery_mode=None, declare.append(self.exchange) if retry: + self.connection.transport_options.update(retry_policy) _publish = self.connection.ensure(self, _publish, **retry_policy) return _publish( body, priority, content_type, content_encoding, diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py index c48af6d7a..beefe44e3 100644 --- a/t/unit/test_messaging.py +++ b/t/unit/test_messaging.py @@ -122,6 +122,17 @@ def test_prepare_is_already_unicode(self): assert ctype == 'text/plain' assert cencoding == 'utf-8' + def test_publish_retry_policy(self): + p = self.connection.Producer() + p.channel = Mock() + p.channel.connection.client.declared_entities = set() + expected_retry_policy = { + 'max_retries': 20 + } + p.publish('hello', retry=True, retry_policy=expected_retry_policy) + + assert self.connection.transport_options == expected_retry_policy + def test_publish_with_Exchange_instance(self): p = self.connection.Producer() p.channel = Mock()