From 7ae0059fe7464431cf11f916e44216bdf536068f Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Wed, 11 Sep 2024 05:16:49 +0200 Subject: [PATCH] Refactor connection concurrency model (#1275) --- pulsar/internal/connection.go | 221 ++++++++++++++-------------------- 1 file changed, 93 insertions(+), 128 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index c1f38f8981..462cfe885a 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -35,8 +35,6 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" - - ua "go.uber.org/atomic" ) const ( @@ -87,7 +85,7 @@ type Connection interface { ID() string GetMaxMessageSize() int32 Close() - WaitForClose() <-chan interface{} + WaitForClose() <-chan struct{} IsProxied() bool } @@ -108,7 +106,6 @@ type connectionState int32 const ( connectionInit = iota connectionReady - connectionClosing connectionClosed ) @@ -118,8 +115,6 @@ func (s connectionState) String() string { return "Initializing" case connectionReady: return "Ready" - case connectionClosing: - return "Closing" case connectionClosed: return "Closed" default: @@ -139,16 +134,19 @@ type incomingCmd struct { } type connection struct { - sync.Mutex - cond *sync.Cond started int32 - state ua.Int32 connectionTimeout time.Duration closeOnce sync.Once + // mu protects the fields below against concurrency accesses. + mu sync.RWMutex + state atomic.Int32 + cnx net.Conn + listeners map[uint64]ConnectionListener + consumerHandlers map[uint64]ConsumerHandler + logicalAddr *url.URL physicalAddr *url.URL - cnx net.Conn writeBufferLock sync.Mutex writeBuffer Buffer @@ -162,18 +160,13 @@ type connection struct { incomingRequestsWG sync.WaitGroup incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd - closeCh chan interface{} + closeCh chan struct{} + readyCh chan struct{} writeRequestsCh chan Buffer pendingLock sync.Mutex pendingReqs map[uint64]*request - listenersLock sync.RWMutex - listeners map[uint64]ConnectionListener - - consumerHandlersLock sync.RWMutex - consumerHandlers map[uint64]ConsumerHandler - tlsOptions *TLSOptions auth auth.Provider @@ -210,7 +203,8 @@ func newConnection(opts connectionOptions) *connection { tlsOptions: opts.tls, auth: opts.auth, - closeCh: make(chan interface{}), + closeCh: make(chan struct{}), + readyCh: make(chan struct{}), incomingRequestsCh: make(chan *request, 10), incomingCmdCh: make(chan *incomingCmd, 10), @@ -224,9 +218,8 @@ func newConnection(opts connectionOptions) *connection { consumerHandlers: make(map[uint64]ConsumerHandler), metrics: opts.metrics, } - cnx.setState(connectionInit) + cnx.state.Store(int32(connectionInit)) cnx.reader = newConnectionReader(cnx) - cnx.cond = sync.NewCond(cnx) return cnx } @@ -289,11 +282,11 @@ func (c *connection) connect() bool { return false } - c.Lock() + c.mu.Lock() c.cnx = cnx c.log = c.log.SubLogger(log.Fields{"local_addr": c.cnx.LocalAddr()}) c.log.Info("TCP connection established") - c.Unlock() + c.mu.Unlock() return true } @@ -347,7 +340,8 @@ func (c *connection) doHandshake() bool { } c.log.Info("Connection is ready") c.setLastDataReceived(time.Now()) - c.changeState(connectionReady) + c.setStateReady() + close(c.readyCh) // broadcast the readiness of the connection. return true } @@ -356,22 +350,13 @@ func (c *connection) IsProxied() bool { } func (c *connection) waitUntilReady() error { - // If we are going to call cond.Wait() at all, then we must call it _before_ we call cond.Broadcast(). - // The lock is held here to prevent changeState() from calling cond.Broadcast() in the time between - // the state check and call to cond.Wait(). - c.Lock() - defer c.Unlock() - - for c.getState() != connectionReady { - c.log.Debugf("Wait until connection is ready state=%s", c.getState().String()) - if c.getState() == connectionClosed { - return errors.New("connection error") - } - // wait for a new connection state change - c.cond.Wait() + select { + case <-c.readyCh: + return nil + case <-c.closeCh: + // Connection has been closed while waiting for the readiness. + return errors.New("connection error") } - - return nil } func (c *connection) failLeftRequestsWhenClose() { @@ -492,8 +477,8 @@ func (c *connection) WriteData(data Buffer) { // 1. blocked, in which case we need to wait until we have space // 2. the connection is already closed, then we need to bail out c.log.Debug("Couldn't write on connection channel immediately") - state := c.getState() - if state != connectionReady { + + if c.getState() != connectionReady { c.log.Debug("Connection was already closed") return } @@ -648,8 +633,7 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, c.incomingRequestsWG.Add(1) defer c.incomingRequestsWG.Done() - state := c.getState() - if state == connectionClosed || state == connectionClosing { + if c.getState() == connectionClosed { callback(req, ErrConnectionClosed) } else { @@ -670,8 +654,7 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { c.incomingRequestsWG.Add(1) defer c.incomingRequestsWG.Done() - state := c.getState() - if state == connectionClosed || state == connectionClosing { + if c.getState() == connectionClosed { return ErrConnectionClosed } @@ -751,9 +734,9 @@ func (c *connection) handleAckResponse(ackResponse *pb.CommandAckResponse) { func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) { producerID := response.GetProducerId() - c.listenersLock.RLock() + c.mu.RLock() producer, ok := c.listeners[producerID] - c.listenersLock.RUnlock() + c.mu.RUnlock() if ok { producer.ReceivedSendReceipt(response) @@ -893,12 +876,13 @@ func (c *connection) handleSendError(sendError *pb.CommandSendError) { } func (c *connection) deletePendingProducers(producerID uint64) (ConnectionListener, bool) { - c.listenersLock.Lock() + c.mu.Lock() + defer c.mu.Unlock() + producer, ok := c.listeners[producerID] if ok { delete(c.listeners, producerID) } - c.listenersLock.Unlock() return producer, ok } @@ -960,9 +944,9 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi return } if commandTopicMigrated.GetResourceType() == pb.CommandTopicMigrated_Producer { - c.listenersLock.RLock() + c.mu.RLock() producer, ok := c.listeners[resourceID] - c.listenersLock.RUnlock() + c.mu.RUnlock() if ok { producer.SetRedirectedClusterURI(migratedBrokerServiceURL) c.log.Infof("producerID:{%d} migrated to RedirectedClusterURI:{%s}", @@ -984,73 +968,55 @@ func (c *connection) handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi } func (c *connection) RegisterListener(id uint64, listener ConnectionListener) error { - // do not add if connection is closed - if c.closed() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.getState() == connectionClosed { c.log.Warnf("Connection closed unable register listener id=%+v", id) return errUnableRegisterListener } - c.listenersLock.Lock() - defer c.listenersLock.Unlock() - c.listeners[id] = listener return nil } func (c *connection) UnregisterListener(id uint64) { - c.listenersLock.Lock() - defer c.listenersLock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() delete(c.listeners, id) } func (c *connection) ResetLastActive() { - c.Lock() - defer c.Unlock() + c.mu.Lock() + defer c.mu.Unlock() + c.lastActive = time.Now() } func (c *connection) isIdle() bool { - { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() - if len(c.pendingReqs) != 0 { - return false - } - } - - { - c.listenersLock.RLock() - defer c.listenersLock.RUnlock() - if len(c.listeners) != 0 { - return false - } - } - - { - c.consumerHandlersLock.Lock() - defer c.consumerHandlersLock.Unlock() - if len(c.consumerHandlers) != 0 { - return false - } - } - - if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 { - return false - } - return true + return len(c.listeners) == 0 && + len(c.consumerHandlers) == 0 && + len(c.incomingRequestsCh) == 0 && + len(c.writeRequestsCh) == 0 } func (c *connection) CheckIdle(maxIdleTime time.Duration) bool { - // We don't need to lock here because this method should only be - // called in a single goroutine of the connectionPool - if !c.isIdle() { + c.pendingLock.Lock() + sizePendingReqs := len(c.pendingReqs) + c.pendingLock.Unlock() + + c.mu.Lock() + defer c.mu.Unlock() + + if sizePendingReqs != 0 || !c.isIdle() { c.lastActive = time.Now() } + return time.Since(c.lastActive) > maxIdleTime } -func (c *connection) WaitForClose() <-chan interface{} { +func (c *connection) WaitForClose() <-chan struct{} { return c.closeCh } @@ -1059,10 +1025,7 @@ func (c *connection) WaitForClose() <-chan interface{} { // This also triggers callbacks to the ConnectionClosed listeners. func (c *connection) Close() { c.closeOnce.Do(func() { - c.Lock() - cnx := c.cnx - c.Unlock() - c.changeState(connectionClosed) + listeners, consumerHandlers, cnx := c.closeAndEmptyObservers() if cnx != nil { _ = cnx.Close() @@ -1070,22 +1033,6 @@ func (c *connection) Close() { close(c.closeCh) - listeners := make(map[uint64]ConnectionListener) - c.listenersLock.Lock() - for id, listener := range c.listeners { - listeners[id] = listener - delete(c.listeners, id) - } - c.listenersLock.Unlock() - - consumerHandlers := make(map[uint64]ConsumerHandler) - c.consumerHandlersLock.Lock() - for id, handler := range c.consumerHandlers { - consumerHandlers[id] = handler - delete(c.consumerHandlers, id) - } - c.consumerHandlersLock.Unlock() - // notify producers connection closed for _, listener := range listeners { listener.ConnectionClosed(nil) @@ -1100,22 +1047,35 @@ func (c *connection) Close() { }) } -func (c *connection) changeState(state connectionState) { - // The lock is held here because we need setState() and cond.Broadcast() to be - // an atomic operation from the point of view of waitUntilReady(). - c.Lock() - defer c.Unlock() +func (c *connection) closeAndEmptyObservers() ([]ConnectionListener, []ConsumerHandler, net.Conn) { + c.mu.Lock() + defer c.mu.Unlock() + + c.setStateClosed() + + listeners := make([]ConnectionListener, 0, len(c.listeners)) + for _, listener := range c.listeners { + listeners = append(listeners, listener) + } - c.setState(state) - c.cond.Broadcast() + handlers := make([]ConsumerHandler, 0, len(c.consumerHandlers)) + for _, handler := range c.consumerHandlers { + handlers = append(handlers, handler) + } + + return listeners, handlers, c.cnx } func (c *connection) getState() connectionState { return connectionState(c.state.Load()) } -func (c *connection) setState(state connectionState) { - c.state.Store(int32(state)) +func (c *connection) setStateReady() { + c.state.CompareAndSwap(int32(connectionInit), int32(connectionReady)) +} + +func (c *connection) setStateClosed() { + c.state.Store(int32(connectionClosed)) } func (c *connection) closed() bool { @@ -1173,32 +1133,37 @@ func (c *connection) getTLSConfig() (*tls.Config, error) { } func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) error { - // do not add if connection is closed - if c.closed() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.getState() == connectionClosed { c.log.Warnf("Closed connection unable add consumer with id=%+v", id) return errUnableAddConsumeHandler } - c.consumerHandlersLock.Lock() - defer c.consumerHandlersLock.Unlock() c.consumerHandlers[id] = handler return nil } func (c *connection) DeleteConsumeHandler(id uint64) { - c.consumerHandlersLock.Lock() - defer c.consumerHandlersLock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() + delete(c.consumerHandlers, id) } func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) { - c.consumerHandlersLock.RLock() - defer c.consumerHandlersLock.RUnlock() + c.mu.RLock() + defer c.mu.RUnlock() + h, ok := c.consumerHandlers[id] return h, ok } func (c *connection) ID() string { + c.mu.RLock() + defer c.mu.RUnlock() + return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr()) }