From 98dc8d42abb638a4371969214c93d96c5dc51cc5 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Wed, 11 Sep 2024 11:54:24 +0200 Subject: [PATCH] [Issue 1272][connection] Attempt to avoid deadlock during reconnection (#1273) Fixes #1272 ### Motivation Producers and consumers register themselves to the connection to be notified when it gets closed. Even though the callback push the events in a channel, it can get stuck and the connection pool is locked which prevents any other caller to get a connection. ### Modifications This PR makes sure that the callback never blocks. --- pulsar/consumer_partition.go | 10 +++++++--- pulsar/producer_partition.go | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc122..f307972c3c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -332,7 +332,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon startMessageID: atomicMessageID{msgID: options.startMessageID}, connectedCh: make(chan struct{}), messageCh: messageCh, - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), closeCh: make(chan struct{}), clearQueueCh: make(chan func(id *trackingMessageID)), compressionProviders: sync.Map{}, @@ -1381,8 +1381,12 @@ func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseCons assignedBrokerURL = pc.client.selectServiceURL( closeConsumer.GetAssignedBrokerServiceUrl(), closeConsumer.GetAssignedBrokerServiceUrlTls()) } - pc.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case pc.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1677c57026..97ab8b942c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -171,7 +171,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions producerID: client.rpcClient.NewProducerID(), dataChan: make(chan *sendRequest, maxPendingMessages), cmdChan: make(chan interface{}, 10), - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -413,8 +413,12 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu assignedBrokerURL = p.client.selectServiceURL( closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls()) } - p.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } }