Skip to content

Commit

Permalink
fix deadlock when GetChannelMediumOptions is used (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Aug 1, 2024
1 parent 70fd357 commit 96afa72
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
5 changes: 4 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,11 @@ func (c *Client) checkPong() {
}
lastSeen := c.lastSeen
c.mu.RUnlock()
c.node.metrics.observePingPongDuration(time.Duration(lastSeen-lastPing)*time.Nanosecond, c.transport.Name())
if lastSeen < lastPing {
go func() { c.Disconnect(DisconnectNoPong) }()
return
}
c.node.metrics.observePingPongDuration(time.Duration(lastSeen-lastPing)*time.Nanosecond, c.transport.Name())
c.mu.Lock()
c.nextPong = 0
c.scheduleNextTimer()
Expand Down Expand Up @@ -1176,6 +1176,9 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
if c.lastPing <= 0 {
// No ping was issued, unnecessary pong.
c.mu.Unlock()
if c.node.LogEnabled(LogLevelDebug) {
c.node.logger.log(newLogEntry(LogLevelDebug, "disconnect client due to unnecessary pong", map[string]any{"client": c.ID(), "user": c.UserID()}))
}
return &DisconnectBadRequest, false
}
// upon receiving pong we change a sign of lastPing value. This way we can handle
Expand Down
36 changes: 30 additions & 6 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@ type Node struct {

emulationSurveyHandler *emulationSurveyHandler

mediums map[string]*channelMedium
mediums map[string]*channelMedium
mediumLocks map[int]*sync.Mutex // Sharded locks for mediums map.
}

const (
numSubLocks = 16384
numMediumLocks = 16384
numSubDissolverWorkers = 64
)

Expand Down Expand Up @@ -136,6 +138,11 @@ func New(c Config) (*Node, error) {
subLocks[i] = &sync.Mutex{}
}

mediumLocks := make(map[int]*sync.Mutex, numMediumLocks)
for i := 0; i < numMediumLocks; i++ {
mediumLocks[i] = &sync.Mutex{}
}

if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -164,6 +171,7 @@ func New(c Config) (*Node, error) {
nowTimeGetter: nowtime.Get,
surveyRegistry: make(map[uint64]chan survey),
mediums: map[string]*channelMedium{},
mediumLocks: mediumLocks,
}
n.emulationSurveyHandler = newEmulationSurveyHandler(n)

Expand Down Expand Up @@ -214,6 +222,10 @@ func (n *Node) subLock(ch string) *sync.Mutex {
return n.subLocks[index(ch, numSubLocks)]
}

func (n *Node) mediumLock(ch string) *sync.Mutex {
return n.mediumLocks[index(ch, numMediumLocks)]
}

// SetBroker allows setting Broker implementation to use.
func (n *Node) SetBroker(b Broker) {
n.broker = b
Expand Down Expand Up @@ -990,21 +1002,28 @@ func (n *Node) addSubscription(ch string, sub subInfo) error {
if mediumOptions.isMediumEnabled() {
medium, err := newChannelMedium(ch, n, mediumOptions)
if err != nil {
_, _ = n.hub.removeSub(ch, sub.client)
return err
}
mediumMu := n.mediumLock(ch)
mediumMu.Lock()
n.mediums[ch] = medium
mediumMu.Unlock()
}
}

err := n.getBroker(ch).Subscribe(ch)
if err != nil {
_, _ = n.hub.removeSub(ch, sub.client)
if n.config.GetChannelMediumOptions != nil {
mediumMu := n.mediumLock(ch)
mediumMu.Lock()
medium, ok := n.mediums[ch]
if ok {
medium.close()
delete(n.mediums, ch)
}
mediumMu.Unlock()
}
return err
}
Expand Down Expand Up @@ -1040,10 +1059,15 @@ func (n *Node) removeSubscription(ch string, c *Client) error {
// Cool down a bit since broker is not ready to process unsubscription.
time.Sleep(500 * time.Millisecond)
} else {
medium, ok := n.mediums[ch]
if ok {
medium.close()
delete(n.mediums, ch)
if n.config.GetChannelMediumOptions != nil {
mediumMu := n.mediumLock(ch)
mediumMu.Lock()
medium, ok := n.mediums[ch]
if ok {
medium.close()
delete(n.mediums, ch)
}
mediumMu.Unlock()
}
}
return err
Expand Down Expand Up @@ -1623,7 +1647,7 @@ func (n *Node) HandlePublication(ch string, pub *Publication, sp StreamPosition,
panic("nil Publication received, this must never happen")
}
if n.config.GetChannelMediumOptions != nil {
mu := n.subLock(ch)
mu := n.mediumLock(ch) // Note, avoid using subLock in HandlePublication – this leads to the deadlock.
mu.Lock()
medium, ok := n.mediums[ch]
mu.Unlock()
Expand Down

0 comments on commit 96afa72

Please sign in to comment.