diff --git a/client.go b/client.go index 19d23ef3..2eb84099 100644 --- a/client.go +++ b/client.go @@ -518,11 +518,11 @@ func (c *Client) checkPong() { } lastSeen := c.lastSeen c.mu.RUnlock() - c.node.metrics.observePingPongDuration(time.Duration(lastSeen-lastPing)*time.Nanosecond, c.transport.Name()) if lastSeen < lastPing { go func() { c.Disconnect(DisconnectNoPong) }() return } + c.node.metrics.observePingPongDuration(time.Duration(lastSeen-lastPing)*time.Nanosecond, c.transport.Name()) c.mu.Lock() c.nextPong = 0 c.scheduleNextTimer() @@ -1176,6 +1176,9 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec if c.lastPing <= 0 { // No ping was issued, unnecessary pong. c.mu.Unlock() + if c.node.LogEnabled(LogLevelDebug) { + c.node.logger.log(newLogEntry(LogLevelDebug, "disconnect client due to unnecessary pong", map[string]any{"client": c.ID(), "user": c.UserID()})) + } return &DisconnectBadRequest, false } // upon receiving pong we change a sign of lastPing value. This way we can handle diff --git a/node.go b/node.go index 8dc63af1..d8611519 100644 --- a/node.go +++ b/node.go @@ -84,11 +84,13 @@ type Node struct { emulationSurveyHandler *emulationSurveyHandler - mediums map[string]*channelMedium + mediums map[string]*channelMedium + mediumLocks map[int]*sync.Mutex // Sharded locks for mediums map. } const ( numSubLocks = 16384 + numMediumLocks = 16384 numSubDissolverWorkers = 64 ) @@ -136,6 +138,11 @@ func New(c Config) (*Node, error) { subLocks[i] = &sync.Mutex{} } + mediumLocks := make(map[int]*sync.Mutex, numMediumLocks) + for i := 0; i < numMediumLocks; i++ { + mediumLocks[i] = &sync.Mutex{} + } + if c.Name == "" { hostname, err := os.Hostname() if err != nil { @@ -164,6 +171,7 @@ func New(c Config) (*Node, error) { nowTimeGetter: nowtime.Get, surveyRegistry: make(map[uint64]chan survey), mediums: map[string]*channelMedium{}, + mediumLocks: mediumLocks, } n.emulationSurveyHandler = newEmulationSurveyHandler(n) @@ -214,6 +222,10 @@ func (n *Node) subLock(ch string) *sync.Mutex { return n.subLocks[index(ch, numSubLocks)] } +func (n *Node) mediumLock(ch string) *sync.Mutex { + return n.mediumLocks[index(ch, numMediumLocks)] +} + // SetBroker allows setting Broker implementation to use. func (n *Node) SetBroker(b Broker) { n.broker = b @@ -990,9 +1002,13 @@ func (n *Node) addSubscription(ch string, sub subInfo) error { if mediumOptions.isMediumEnabled() { medium, err := newChannelMedium(ch, n, mediumOptions) if err != nil { + _, _ = n.hub.removeSub(ch, sub.client) return err } + mediumMu := n.mediumLock(ch) + mediumMu.Lock() n.mediums[ch] = medium + mediumMu.Unlock() } } @@ -1000,11 +1016,14 @@ func (n *Node) addSubscription(ch string, sub subInfo) error { if err != nil { _, _ = n.hub.removeSub(ch, sub.client) if n.config.GetChannelMediumOptions != nil { + mediumMu := n.mediumLock(ch) + mediumMu.Lock() medium, ok := n.mediums[ch] if ok { medium.close() delete(n.mediums, ch) } + mediumMu.Unlock() } return err } @@ -1040,10 +1059,15 @@ func (n *Node) removeSubscription(ch string, c *Client) error { // Cool down a bit since broker is not ready to process unsubscription. time.Sleep(500 * time.Millisecond) } else { - medium, ok := n.mediums[ch] - if ok { - medium.close() - delete(n.mediums, ch) + if n.config.GetChannelMediumOptions != nil { + mediumMu := n.mediumLock(ch) + mediumMu.Lock() + medium, ok := n.mediums[ch] + if ok { + medium.close() + delete(n.mediums, ch) + } + mediumMu.Unlock() } } return err @@ -1623,7 +1647,7 @@ func (n *Node) HandlePublication(ch string, pub *Publication, sp StreamPosition, panic("nil Publication received, this must never happen") } if n.config.GetChannelMediumOptions != nil { - mu := n.subLock(ch) + mu := n.mediumLock(ch) // Note, avoid using subLock in HandlePublication – this leads to the deadlock. mu.Lock() medium, ok := n.mediums[ch] mu.Unlock()