From 0758427fdee967e899ba64b0b2d87cfd13294a13 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 11 Dec 2022 21:13:41 +0800 Subject: [PATCH] perf: move flush delay after WaitForWrite() to be more reactive --- pipe.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pipe.go b/pipe.go index de852a91..e33378fa 100644 --- a/pipe.go +++ b/pipe.go @@ -286,21 +286,20 @@ func (p *pipe) _backgroundWrite() (err error) { ones = make([]cmds.Completed, 1) multi []cmds.Completed ch chan RedisResult - delay = p.maxFlushDelay + + flushDelay = p.maxFlushDelay + flushStart = time.Time{} ) for atomic.LoadInt32(&p.state) < 3 { if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil { + if flushDelay != 0 { + flushStart = time.Now() + } if p.w.Buffered() == 0 { err = p.Error() } else { - if delay == 0 || atomic.LoadInt32(&p.waits) == 1 { // do not delay for sequential usage - err = p.w.Flush() - } else { - ts := time.Now() - err = p.w.Flush() - time.Sleep(delay - time.Since(ts)) // ref: https://github.com/rueian/rueidis/issues/156 - } + err = p.w.Flush() } if err == nil { if atomic.LoadInt32(&p.state) == 1 { @@ -309,6 +308,9 @@ func (p *pipe) _backgroundWrite() (err error) { runtime.Gosched() continue } + if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage + time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/rueian/rueidis/issues/156 + } } } if ch != nil && multi == nil {