Skip to content

Commit

Permalink
Merge pull request #292 from sylwiaszunejko/debounce_fill
Browse files Browse the repository at this point in the history
Start debouncing pool.fill() calls
  • Loading branch information
dkropachev authored Oct 7, 2024
2 parents f9eeb86 + 0422162 commit d76e5b3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 4 deletions.
14 changes: 11 additions & 3 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net"
"sync"
"time"

"github.com/gocql/gocql/debounce"
)

// interface to implement to receive the host information
Expand Down Expand Up @@ -203,7 +205,7 @@ func (p *policyConnPool) addHost(host *HostInfo) {
}
p.mu.Unlock()

pool.fill()
pool.fill_debounce()
}

func (p *policyConnPool) removeHost(hostID string) {
Expand Down Expand Up @@ -232,6 +234,7 @@ type hostConnPool struct {
connPicker ConnPicker
closed bool
filling bool
debouncer *debounce.SimpleDebouncer

logger StdLogger
}
Expand All @@ -256,6 +259,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
filling: false,
closed: false,
logger: session.logger,
debouncer: debounce.NewSimpleDebouncer(),
}

// the pool is not filled or connected
Expand All @@ -274,7 +278,7 @@ func (pool *hostConnPool) Pick(token Token, qry ExecutableQuery) *Conn {
size, missing := pool.connPicker.Size()
if missing > 0 {
// try to fill the pool
go pool.fill()
go pool.fill_debounce()

if size == 0 {
return nil
Expand Down Expand Up @@ -385,6 +389,10 @@ func (pool *hostConnPool) fill() {
}()
}

func (pool *hostConnPool) fill_debounce() {
pool.debouncer.Debounce(pool.fill)
}

func (pool *hostConnPool) logConnectErr(err error) {
if opErr, ok := err.(*net.OpError); ok && (opErr.Op == "dial" || opErr.Op == "read") {
// connection refused
Expand Down Expand Up @@ -551,5 +559,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
}

pool.connPicker.Remove(conn)
go pool.fill()
go pool.fill_debounce()
}
25 changes: 25 additions & 0 deletions debounce/simple_debouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package debounce

// SimpleDebouncer debounce function call with simple logc:
// 1. If call is currently pending, function call should go through
// 2. If call is scheduled, but not pending, function call should be voided
type SimpleDebouncer struct {
channel chan struct{}
}

// NewDebouncer creates a new Debouncer with a buffered channel of size 1
func NewSimpleDebouncer() *SimpleDebouncer {
return &SimpleDebouncer{
channel: make(chan struct{}, 1),
}
}

// Debounce attempts to execute the function if the channel allows it
func (d *SimpleDebouncer) Debounce(fn func()) {
select {
case d.channel <- struct{}{}:
fn()
<-d.channel
default:
}
}
55 changes: 55 additions & 0 deletions debounce/simple_debouncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package debounce

import (
"runtime"
"sync/atomic"
"testing"
)

// TestDebouncer tests that the debouncer allows only one function to execute at a time
func TestSimpleDebouncer(t *testing.T) {
d := NewSimpleDebouncer()
var executions int32
startedCh := make(chan struct{}, 1)
doneCh := make(chan struct{}, 1)
// Function to increment executions
fn := func() {
<-startedCh // Simulate work
atomic.AddInt32(&executions, 1)
<-doneCh // Simulate work
}
t.Run("Case 1", func(t *testing.T) {
// Case 1: Normal single execution
startedCh <- struct{}{}
doneCh <- struct{}{}
d.Debounce(fn)
// We expect that the function has only executed once due to debouncing
if atomic.LoadInt32(&executions) != 1 {
t.Errorf("Expected function to be executed only once, but got %d executions", executions)
}
})
atomic.StoreInt32(&executions, 0)
t.Run("Case 2", func(t *testing.T) {
// Case 2: Debounce the function multiple times at row when body is started
go d.Debounce(fn)
startedCh <- struct{}{}
go d.Debounce(fn)
go d.Debounce(fn)
doneCh <- struct{}{}
startedCh <- struct{}{}
doneCh <- struct{}{}
waitTillChannelIsEmpty(doneCh)
// We expect that the function has only executed once due to debouncing
if atomic.LoadInt32(&executions) != 2 {
t.Errorf("Expected function to be executed twice, but got %d executions", executions)
}
})
}
func waitTillChannelIsEmpty(ch chan struct{}) {
for {
if len(ch) == 0 {
return
}
runtime.Gosched()
}
}
2 changes: 1 addition & 1 deletion scylla_shard_aware_port_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func triggerPoolsRefill(sess *Session) {
hosts := sess.ring.allHosts()
for _, host := range hosts {
hostPool, _ := sess.pool.getPool(host)
go hostPool.fill()
go hostPool.fill_debounce()
}
}

Expand Down

0 comments on commit d76e5b3

Please sign in to comment.