Skip to content

Commit

Permalink
perf: move flush delay after WaitForWrite() to be more reactive
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Dec 11, 2022
1 parent 4be3a21 commit 0758427
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,21 +286,20 @@ func (p *pipe) _backgroundWrite() (err error) {
ones = make([]cmds.Completed, 1)
multi []cmds.Completed
ch chan RedisResult
delay = p.maxFlushDelay

flushDelay = p.maxFlushDelay
flushStart = time.Time{}
)

for atomic.LoadInt32(&p.state) < 3 {
if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil {
if flushDelay != 0 {
flushStart = time.Now()
}
if p.w.Buffered() == 0 {
err = p.Error()
} else {
if delay == 0 || atomic.LoadInt32(&p.waits) == 1 { // do not delay for sequential usage
err = p.w.Flush()
} else {
ts := time.Now()
err = p.w.Flush()
time.Sleep(delay - time.Since(ts)) // ref: https://github.com/rueian/rueidis/issues/156
}
err = p.w.Flush()
}
if err == nil {
if atomic.LoadInt32(&p.state) == 1 {
Expand All @@ -309,6 +308,9 @@ func (p *pipe) _backgroundWrite() (err error) {
runtime.Gosched()
continue
}
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/rueian/rueidis/issues/156
}
}
}
if ch != nil && multi == nil {
Expand Down

0 comments on commit 0758427

Please sign in to comment.