From c69bd65538989f8d5146f619c0320b95cc3c90de Mon Sep 17 00:00:00 2001 From: Haim Daniel Date: Wed, 23 Oct 2024 17:58:15 +0300 Subject: [PATCH] Fix: restrict google protobuf version - protobuf version in versions larger than 4.25.5 when used by google-cloud-pubsub raises the following error: site-packages/google/protobuf/internal/well_known_types.py", line 443, in FromTimedelta raise AttributeError( AttributeError: Fail to convert to Duration. Expected a timedelta like object got str: 'str' object has no attribute 'seconds' Fix this by restriction of the allowed package versions - Added unit test to validate pubsub and protobuf compatibility - Enabled google-cloud-pubsub package versions bump --- requirements/extras/gcpubsub.txt | 3 ++- t/unit/transport/test_gcpubsub.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/requirements/extras/gcpubsub.txt b/requirements/extras/gcpubsub.txt index c4c1b3c60..8221bb0c6 100644 --- a/requirements/extras/gcpubsub.txt +++ b/requirements/extras/gcpubsub.txt @@ -1,3 +1,4 @@ -google-cloud-pubsub>=2.18.4,<=2.20.3 +google-cloud-pubsub>=2.18.4 google-cloud-monitoring>=2.16.0 grpcio==1.67.0 +protobuf==4.25.5 diff --git a/t/unit/transport/test_gcpubsub.py b/t/unit/transport/test_gcpubsub.py index 5e60336b6..c617a329e 100644 --- a/t/unit/transport/test_gcpubsub.py +++ b/t/unit/transport/test_gcpubsub.py @@ -9,6 +9,7 @@ from _socket import timeout as socket_timeout from google.api_core.exceptions import (AlreadyExists, DeadlineExceeded, PermissionDenied) +from google.pubsub_v1.types.pubsub import Subscription from kombu.transport.gcpubsub import (AtomicCounter, Channel, QueueDescriptor, Transport, UnackedIds) @@ -283,6 +284,17 @@ def test_create_subscription(self, channel): assert result == subscription_path channel.subscriber.create_subscription.assert_called_once() + def test_create_subscription_protobuf_compat(self): + request = { + 'name': 'projects/my_project/subscriptions/kombu-1111-2222', + 'topic': 'projects/jether-fox/topics/reply.celery.pidbox', + 'ack_deadline_seconds': 240, + 'expiration_policy': {'ttl': '86400s'}, + 'message_retention_duration': '86400s', + 'filter': 'attributes.routing_key="1111-2222"', + } + Subscription(request) + def test_delete(self, channel): queue = "test_queue" subscription_path = "projects/project-id/subscriptions/test_queue"