From 6338a89fd01da9a0793e847dca792e2840e87806 Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Thu, 10 Oct 2024 14:12:52 +0800 Subject: [PATCH] fix(wpool): no allocation and correct ctx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit goos: darwin goarch: arm64 pkg: github.com/cloudwego/kitex/internal/wpool │ ./old.txt │ ./new.txt │ │ sec/op │ sec/op vs base │ WPool-12 562.7n ± 18% 542.2n ± 5% -3.64% (p=0.029 n=10) │ ./old.txt │ ./new.txt │ │ B/op │ B/op vs base │ WPool-12 12.000 ± 8% 7.000 ± 0% -41.67% (p=0.000 n=10) │ ./old.txt │ ./new.txt │ │ allocs/op │ allocs/op vs base │ WPool-12 1.000 ± 0% 0.000 ± 0% -100.00% (p=0.000 n=10) --- internal/wpool/pool.go | 71 +++++++++++++++++++++++-------------- internal/wpool/pool_test.go | 13 +++++++ pkg/profiler/profiler.go | 6 ++++ 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/internal/wpool/pool.go b/internal/wpool/pool.go index 8699bf84a1..dc40642220 100644 --- a/internal/wpool/pool.go +++ b/internal/wpool/pool.go @@ -32,7 +32,10 @@ import ( ) // Task is the function that the worker will execute. -type Task func() +type Task struct { + ctx context.Context + f func() +} // Pool is a worker pool bind with some idle goroutines. type Pool struct { @@ -61,54 +64,68 @@ func (p *Pool) Size() int32 { } // Go creates/reuses a worker to run task. -func (p *Pool) Go(task Task) { - p.GoCtx(context.Background(), task) +func (p *Pool) Go(f func()) { + p.GoCtx(context.Background(), f) } // GoCtx creates/reuses a worker to run task. -func (p *Pool) GoCtx(ctx context.Context, task Task) { +func (p *Pool) GoCtx(ctx context.Context, f func()) { + t := Task{ctx: ctx, f: f} select { - case p.tasks <- task: + case p.tasks <- t: // reuse exist worker return default: } - // create new worker - atomic.AddInt32(&p.size, 1) - go func() { + // single shot if p.size > p.maxIdle + if atomic.AddInt32(&p.size, 1) > p.maxIdle { + go func(t Task) { + defer func() { + if r := recover(); r != nil { + klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack()) + } + atomic.AddInt32(&p.size, -1) + }() + if profiler.IsEnabled(t.ctx) { + profiler.Tag(t.ctx) + t.f() + profiler.Untag(t.ctx) + } else { + t.f() + } + }(t) + return + } + + // background goroutines for consuming tasks + go func(t Task) { defer func() { if r := recover(); r != nil { klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack()) } atomic.AddInt32(&p.size, -1) }() - - profiler.Tag(ctx) - task() - profiler.Untag(ctx) - - if atomic.LoadInt32(&p.size) > p.maxIdle { - return - } - // waiting for new task idleTimer := time.NewTimer(p.maxIdleTime) for { + if profiler.IsEnabled(t.ctx) { + profiler.Tag(t.ctx) + t.f() + profiler.Untag(t.ctx) + } else { + t.f() + } + idleTimer.Reset(p.maxIdleTime) select { - case task = <-p.tasks: - profiler.Tag(ctx) - task() - profiler.Untag(ctx) + case t = <-p.tasks: + if !idleTimer.Stop() { + <-idleTimer.C + } case <-idleTimer.C: // worker exits return } - - if !idleTimer.Stop() { - <-idleTimer.C - } - idleTimer.Reset(p.maxIdleTime) } - }() + }(t) } diff --git a/internal/wpool/pool_test.go b/internal/wpool/pool_test.go index d8d030afc8..b933508be8 100644 --- a/internal/wpool/pool_test.go +++ b/internal/wpool/pool_test.go @@ -17,6 +17,7 @@ package wpool import ( + "context" "runtime" "sync" "sync/atomic" @@ -55,3 +56,15 @@ func TestWPool(t *testing.T) { } test.Assert(t, p.Size() == 0) } + +func BenchmarkWPool(b *testing.B) { + maxIdleWorkers := runtime.GOMAXPROCS(0) + ctx := context.Background() + p := New(maxIdleWorkers, 10*time.Millisecond) + for i := 0; i < b.N; i++ { + p.GoCtx(ctx, func() {}) + for int(p.Size()) > maxIdleWorkers { + runtime.Gosched() + } + } +} diff --git a/pkg/profiler/profiler.go b/pkg/profiler/profiler.go index d52683b297..ff3e42c9a8 100644 --- a/pkg/profiler/profiler.go +++ b/pkg/profiler/profiler.go @@ -106,6 +106,12 @@ func Tag(ctx context.Context) { } } +// IsEnabled returns true if profiler is enabled +// This func is short enough can be inlined, and likely it returns false +func IsEnabled(ctx context.Context) bool { + return ctx.Value(profilerContextKey{}) != nil +} + // Untag current goroutine with tagged ctx // it's used for reuse goroutine scenario func Untag(ctx context.Context) {