Skip to content

Commit

Permalink
distributor: fix pool buffer reuse logic when `distributor.max-reques…
Browse files Browse the repository at this point in the history
…t-pool-buffer-size` is set.

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
  • Loading branch information
ortuman committed Oct 18, 2024
1 parent 2af261a commit 3b074b1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [BUGFIX] Fix issue where sharded queries could return annotations with incorrect or confusing position information. #9536
* [BUGFIX] Fix issue where downstream consumers may not generate correct cache keys for experimental error caching. #9644
* [BUGFIX] Fix issue where active series requests error when encountering a stale posting. #9580
* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666

### Mixin

Expand Down
7 changes: 1 addition & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

var requestBufferPool util.Pool
if cfg.MaxRequestPoolBufferSize > 0 {
requestBufferPool = util.NewBucketedBufferPool(1<<10, cfg.MaxRequestPoolBufferSize, 4)
} else {
requestBufferPool = util.NewBufferPool()
}
requestBufferPool := util.NewBufferPool(cfg.MaxRequestPoolBufferSize)

d := &Distributor{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down
36 changes: 18 additions & 18 deletions pkg/util/requestbuffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,45 @@ package util
import (
"bytes"
"sync"

"github.com/grafana/mimir/pkg/util/pool"
)

const defaultPoolBufferCap = 256 * 1024

// Pool is an abstraction for a pool of byte slices.
type Pool interface {
// Get returns a new byte slices that fits the given size.
Get(sz int) []byte
// Get returns a new byte slices.
Get() []byte

// Put puts a slice back into the pool.
Put(s []byte)
}

type bufferPool struct {
p sync.Pool
maxBufferCap int
p sync.Pool
}

func (p *bufferPool) Get(_ int) []byte { return p.p.Get().([]byte) }
func (p *bufferPool) Put(s []byte) { p.p.Put(s) } //nolint:staticcheck
func (p *bufferPool) Get() []byte { return p.p.Get().([]byte) }
func (p *bufferPool) Put(s []byte) {
if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap {
return // Discard large buffers
}
p.p.Put(s) //nolint:staticcheck
}

// NewBufferPool returns a new Pool for byte slices.
func NewBufferPool() Pool {
// If maxBufferCapacity is 0, the pool will not have a maximum capacity.
func NewBufferPool(maxBufferCapacity int) Pool {
return &bufferPool{
maxBufferCap: maxBufferCapacity,
p: sync.Pool{
New: func() interface{} {
return make([]byte, 0, 256*1024)
return make([]byte, 0, defaultPoolBufferCap)
},
},
}
}

// NewBucketedBufferPool returns a new Pool for byte slices with bucketing.
// The pool will have buckets for sizes from minSize to maxSize increasing by the given factor.
func NewBucketedBufferPool(minSize, maxSize int, factor float64) Pool {
return pool.NewBucketedPool(minSize, maxSize, factor, func(sz int) []byte {
return make([]byte, 0, sz)
})
}

// RequestBuffers provides pooled request buffers.
type RequestBuffers struct {
p Pool
Expand All @@ -70,7 +70,7 @@ func (rb *RequestBuffers) Get(size int) *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, size))
}

b := rb.p.Get(size)
b := rb.p.Get()
buf := bytes.NewBuffer(b)
buf.Reset()
if size > 0 {
Expand Down
75 changes: 47 additions & 28 deletions pkg/util/requestbuffers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
)

func TestRequestBuffers(t *testing.T) {
const maxBufferSize = 32 * 1024

rb := NewRequestBuffers(&fakePool{maxBufferSize: maxBufferSize})
rb := NewRequestBuffers(&fakePool{})
t.Cleanup(rb.CleanUp)

b := rb.Get(1024)
require.NotNil(t, b)
assert.Equal(t, 1024, b.Cap())
assert.GreaterOrEqual(t, b.Cap(), 1024)
assert.Zero(t, b.Len())
// Make sure that the buffer gets reset upon next Get
_, err := b.Write([]byte("test"))
Expand All @@ -30,20 +28,9 @@ func TestRequestBuffers(t *testing.T) {
// to test if it reuses the previously returned buffer.
b1 := rb.Get(1024)
assert.Same(t, unsafe.SliceData(b1.Bytes()), unsafe.SliceData(b.Bytes()))
assert.Equal(t, 1024, b1.Cap())
assert.GreaterOrEqual(t, b1.Cap(), 1024)
assert.Zero(t, b1.Len())

// Retrieve a buffer larger than maxBufferSize to ensure
// it doesn't get reused.
b2 := rb.Get(maxBufferSize + 1)
assert.Equal(t, maxBufferSize+1, b2.Cap())
assert.Zero(t, b2.Len())

rb.CleanUp()

b3 := rb.Get(maxBufferSize + 1)
assert.NotSame(t, unsafe.SliceData(b2.Bytes()), unsafe.SliceData(b3.Bytes()))

t.Run("as nil pointer", func(t *testing.T) {
var rb *RequestBuffers
b := rb.Get(1024)
Expand All @@ -60,24 +47,56 @@ func TestRequestBuffers(t *testing.T) {
})
}

func TestRequestsBuffersMaxPoolBufferSize(t *testing.T) {
const maxPoolBufferCap = defaultPoolBufferCap

t.Run("pool buffer is reused when size is less or equal to maxBufferSize", func(t *testing.T) {
rb := NewRequestBuffers(NewBufferPool(maxPoolBufferCap))
t.Cleanup(rb.CleanUp)

b0 := rb.Get(maxPoolBufferCap)
require.NotNil(t, b0)
assert.Zero(t, b0.Len())
assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap)

rb.CleanUp()

b1 := rb.Get(maxPoolBufferCap)
assert.Same(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes()))
})
t.Run("pool buffer is not reused when size is greater than maxBufferSize", func(t *testing.T) {
rb := NewRequestBuffers(NewBufferPool(maxPoolBufferCap))
t.Cleanup(rb.CleanUp)

b0 := rb.Get(maxPoolBufferCap + 1)
require.NotNil(t, b0)
assert.Zero(t, b0.Len())
assert.GreaterOrEqual(t, b0.Cap(), maxPoolBufferCap+1)

rb.CleanUp()

b1 := rb.Get(maxPoolBufferCap + 1)
assert.NotSame(t, unsafe.SliceData(b0.Bytes()), unsafe.SliceData(b1.Bytes()))
})
}

type fakePool struct {
maxBufferSize int
buffers [][]byte
maxBufferCap int
buffers [][]byte
}

func (p *fakePool) Get(sz int) []byte {
if sz <= p.maxBufferSize {
for i, b := range p.buffers {
if cap(b) < sz {
continue
}
p.buffers = append(p.buffers[:i], p.buffers[i+1:]...)
return b
}
func (p *fakePool) Get() []byte {
if len(p.buffers) > 0 {
buf := p.buffers[0]
p.buffers = p.buffers[1:]
return buf
}
return make([]byte, 0, sz)
return make([]byte, 0, defaultPoolBufferCap)
}

func (p *fakePool) Put(s []byte) {
if p.maxBufferCap > 0 && cap(s) > p.maxBufferCap {
return
}
p.buffers = append(p.buffers, s[:0])
}
2 changes: 1 addition & 1 deletion tools/trafficdump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const maxBufferPoolSize = 1024 * 1024

var bufferPool = util.NewBucketedBufferPool(1e3, maxBufferPoolSize, 2)
var bufferPool = util.NewBufferPool(maxBufferPoolSize)

type parser struct {
processorConfig processorConfig
Expand Down

0 comments on commit 3b074b1

Please sign in to comment.