Skip to content

Commit

Permalink
fix(waiter): use a buffered channel to fix race condition
Browse files Browse the repository at this point in the history
We discovered a race condition in where `Waiter.Set` and `Waiter.Next`
could both be called once, but `Waiter.Next` would never resolve with
the result.

The reason for this is that `sync.Cond` does not appear to retain any
kind of state, so if `w.c.Broadcast` is called first, `w.c.Wait` does
not know that it can continue.

By using a buffered channel instead we can get around that issue.

According to the benchmark tests this is actually faster too: decreasing
the ops per ns from ~200 to ~150.

Signed-off-by: Matthew Kocher <mkocher@vmware.com>
Co-authored-by: Matthew Kocher <mkocher@vmware.com>
  • Loading branch information
ctlong and mkocher committed Sep 20, 2023
1 parent c04be32 commit 5ec396c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 58 deletions.
37 changes: 11 additions & 26 deletions waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package diodes

import (
"context"
"sync"
)

// Waiter will use a conditional mutex to alert the reader to when data is
// available.
type Waiter struct {
Diode
mu sync.Mutex
c *sync.Cond
c chan struct{}
ctx context.Context
}

Expand All @@ -30,54 +28,41 @@ func WithWaiterContext(ctx context.Context) WaiterConfigOption {
func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter {
w := new(Waiter)
w.Diode = d
w.c = sync.NewCond(&w.mu)
w.c = make(chan struct{}, 1)
w.ctx = context.Background()

for _, opt := range opts {
opt(w)
}

go func() {
<-w.ctx.Done()
w.c.Broadcast()
}()

return w
}

// Set invokes the wrapped diode's Set with the given data and uses Broadcast
// to wake up any readers.
func (w *Waiter) Set(data GenericDataType) {
w.Diode.Set(data)
w.c.Broadcast()
select {
case w.c <- struct{}{}:
default:
return
}
}

// Next returns the next data point on the wrapped diode. If there is not any
// new data, it will Wait for set to be called or the context to be done.
// If the context is done, then nil will be returned.
func (w *Waiter) Next() GenericDataType {
w.mu.Lock()
defer w.mu.Unlock()

for {
data, ok := w.Diode.TryNext()
if !ok {
if w.isDone() {
select {
case <-w.ctx.Done():
return nil
case <-w.c:
continue
}

w.c.Wait()
continue
}
return data
}
}

func (w *Waiter) isDone() bool {
select {
case <-w.ctx.Done():
return true
default:
return false
}
}
85 changes: 53 additions & 32 deletions waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,65 @@ var _ = Describe("Waiter", func() {
w = diodes.NewWaiter(spy)
})

It("returns the available result", func() {
spy.dataList = [][]byte{[]byte("a"), []byte("b")}
Describe("Next", func() {
BeforeEach(func() {
spy.dataList = [][]byte{[]byte("a"), []byte("b")}
})

Expect(*(*[]byte)(w.Next())).To(Equal([]byte("a")))
Expect(*(*[]byte)(w.Next())).To(Equal([]byte("b")))
})
It("returns available data points from the wrapped diode", func() {
Expect(spy.called).To(Equal(0))
Expect(*(*[]byte)(w.Next())).To(Equal([]byte("a")))
Expect(spy.called).To(Equal(1))
Expect(*(*[]byte)(w.Next())).To(Equal([]byte("b")))
Expect(spy.called).To(Equal(2))
})

It("waits until data is available", func() {
go func() {
time.Sleep(250 * time.Millisecond)
data := []byte("a")
w.Set(diodes.GenericDataType(&data))
}()
Context("when there is no new data", func() {
BeforeEach(func() {
spy.dataList = nil
})

Expect(*(*[]byte)(w.Next())).To(Equal([]byte("a")))
})
It("waits for Set to be called", func() {
go func() {
time.Sleep(250 * time.Millisecond)
data := []byte("c")
w.Set(diodes.GenericDataType(&data))
}()
Expect(spy.called).To(Equal(0))
Expect(*(*[]byte)(w.Next())).To(Equal([]byte("c")))
Expect(spy.called).To(Equal(2)) // Calls TryNext twice during wait loop
})

It("cancels Next() with context", func() {
ctx, cancel := context.WithCancel(context.Background())
w = diodes.NewWaiter(spy, diodes.WithWaiterContext(ctx))
cancel()
done := make(chan struct{})
go func() {
defer close(done)
w.Next()
}()
Context("when the context is cancelled", func() {
var cancel context.CancelFunc

Eventually(done).Should(BeClosed())
})
BeforeEach(func() {
var ctx context.Context
ctx, cancel = context.WithCancel(context.Background())
w = diodes.NewWaiter(spy, diodes.WithWaiterContext(ctx))
})

It("cancels current Next() with context", func() {
ctx, cancel := context.WithCancel(context.Background())
w = diodes.NewWaiter(spy, diodes.WithWaiterContext(ctx))
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
Context("beforehand", func() {
It("returns nil", func() {
cancel()
Expect(spy.called).To(Equal(0))
Expect(w.Next() == nil).To(BeTrue())
Expect(spy.called).To(Equal(1))
})
})

Expect(w.Next() == nil).To(BeTrue())
Context("while waiting", func() {
It("returns nil", func() {
go func() {
time.Sleep(250 * time.Millisecond)
cancel()
}()
Expect(spy.called).To(Equal(0))
Expect(w.Next() == nil).To(BeTrue())
Expect(spy.called).To(Equal(1))
})
})
})
})
})
})

0 comments on commit 5ec396c

Please sign in to comment.