Skip to content

Commit

Permalink
fix(wpool): no allocation and correct ctx
Browse files Browse the repository at this point in the history
goos: darwin
goarch: arm64
pkg: github.com/cloudwego/kitex/internal/wpool
         │  ./old.txt   │             ./new.txt              │
         │    sec/op    │   sec/op     vs base               │
WPool-12   562.7n ± 18%   542.2n ± 5%  -3.64% (p=0.029 n=10)

         │  ./old.txt  │             ./new.txt              │
         │    B/op     │    B/op     vs base                │
WPool-12   12.000 ± 8%   7.000 ± 0%  -41.67% (p=0.000 n=10)

         │ ./old.txt  │              ./new.txt              │
         │ allocs/op  │ allocs/op   vs base                 │
WPool-12   1.000 ± 0%   0.000 ± 0%  -100.00% (p=0.000 n=10)
  • Loading branch information
xiaost committed Oct 10, 2024
1 parent 58c5848 commit 6338a89
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 27 deletions.
71 changes: 44 additions & 27 deletions internal/wpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (
)

// Task is the function that the worker will execute.
type Task func()
type Task struct {
ctx context.Context
f func()
}

// Pool is a worker pool bind with some idle goroutines.
type Pool struct {
Expand Down Expand Up @@ -61,54 +64,68 @@ func (p *Pool) Size() int32 {
}

// Go creates/reuses a worker to run task.
func (p *Pool) Go(task Task) {
p.GoCtx(context.Background(), task)
func (p *Pool) Go(f func()) {
p.GoCtx(context.Background(), f)
}

// GoCtx creates/reuses a worker to run task.
func (p *Pool) GoCtx(ctx context.Context, task Task) {
func (p *Pool) GoCtx(ctx context.Context, f func()) {
t := Task{ctx: ctx, f: f}
select {
case p.tasks <- task:
case p.tasks <- t:
// reuse exist worker
return
default:
}

// create new worker
atomic.AddInt32(&p.size, 1)
go func() {
// single shot if p.size > p.maxIdle
if atomic.AddInt32(&p.size, 1) > p.maxIdle {
go func(t Task) {
defer func() {
if r := recover(); r != nil {
klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack())
}
atomic.AddInt32(&p.size, -1)
}()
if profiler.IsEnabled(t.ctx) {
profiler.Tag(t.ctx)
t.f()
profiler.Untag(t.ctx)
} else {
t.f()
}
}(t)
return
}

// background goroutines for consuming tasks
go func(t Task) {
defer func() {
if r := recover(); r != nil {
klog.Errorf("panic in wpool: error=%v: stack=%s", r, debug.Stack())
}
atomic.AddInt32(&p.size, -1)
}()

profiler.Tag(ctx)
task()
profiler.Untag(ctx)

if atomic.LoadInt32(&p.size) > p.maxIdle {
return
}

// waiting for new task
idleTimer := time.NewTimer(p.maxIdleTime)
for {
if profiler.IsEnabled(t.ctx) {
profiler.Tag(t.ctx)
t.f()
profiler.Untag(t.ctx)
} else {
t.f()
}
idleTimer.Reset(p.maxIdleTime)
select {
case task = <-p.tasks:
profiler.Tag(ctx)
task()
profiler.Untag(ctx)
case t = <-p.tasks:
if !idleTimer.Stop() {
<-idleTimer.C
}
case <-idleTimer.C:
// worker exits
return
}

if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
}
}()
}(t)
}
13 changes: 13 additions & 0 deletions internal/wpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package wpool

import (
"context"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,3 +56,15 @@ func TestWPool(t *testing.T) {
}
test.Assert(t, p.Size() == 0)
}

func BenchmarkWPool(b *testing.B) {
maxIdleWorkers := runtime.GOMAXPROCS(0)
ctx := context.Background()
p := New(maxIdleWorkers, 10*time.Millisecond)
for i := 0; i < b.N; i++ {
p.GoCtx(ctx, func() {})
for int(p.Size()) > maxIdleWorkers {
runtime.Gosched()
}
}
}
6 changes: 6 additions & 0 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func Tag(ctx context.Context) {
}
}

// IsEnabled returns true if profiler is enabled
// This func is short enough can be inlined, and likely it returns false
func IsEnabled(ctx context.Context) bool {
return ctx.Value(profilerContextKey{}) != nil
}

// Untag current goroutine with tagged ctx
// it's used for reuse goroutine scenario
func Untag(ctx context.Context) {
Expand Down

0 comments on commit 6338a89

Please sign in to comment.