Skip to content

Commit

Permalink
feat(dot/sync): make worker pool capacity depend on strategy config
Browse files Browse the repository at this point in the history
  • Loading branch information
haikoschol committed Oct 24, 2024
1 parent bd32800 commit 07358f2
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 18 deletions.
12 changes: 10 additions & 2 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@

package sync

import "time"
import (
"time"
)

type ServiceConfig func(svc *SyncService)

func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.currentStrategy = currentStrategy
svc.defaultStrategy = defaultStrategy

wpCapacity := max(currentStrategy.NumOfTasks(), defaultStrategy.NumOfTasks()) * 2 // add some buffer

svc.workerPool = NewWorkerPool(WorkerPoolConfig{
MaxRetries: maxTaskRetries,
Capacity: wpCapacity,
})
}
}

func WithNetwork(net Network) ServiceConfig {
return func(svc *SyncService) {
svc.network = net
//svc.workerPool = newSyncWorkerPool(net)
}
}

Expand Down
4 changes: 4 additions & 0 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ func (f *FullSyncStrategy) IsSynced() bool {
return uint32(highestBlock)+messages.MaxBlocksInResponse >= f.peers.getTarget() //nolint:gosec
}

func (f *FullSyncStrategy) NumOfTasks() int {
return f.numOfTasks
}

type RequestResponseData struct {
req *messages.BlockRequestMessage
responseData []*types.BlockData
Expand Down
7 changes: 2 additions & 5 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Strategy interface {
Process(results <-chan TaskResult) (done bool, repChanges []Change, blocks []peer.ID, err error)
ShowMetrics()
IsSynced() bool
NumOfTasks() int
}

type SyncService struct {
Expand Down Expand Up @@ -120,11 +121,7 @@ func NewSyncService(cfgs ...ServiceConfig) *SyncService {
waitPeersDuration: waitPeersDefaultTimeout,
stopCh: make(chan struct{}),
seenBlockSyncRequests: lrucache.NewLRUCache[common.Hash, uint](100),
workerPool: NewWorkerPool(WorkerPoolConfig{
MaxRetries: maxTaskRetries,
// TODO: This should depend on the actual configuration of the currently used sync strategy.
Capacity: defaultNumOfTasks * 10,
}),
workerPool: nil,
}

for _, cfg := range cfgs {
Expand Down
10 changes: 5 additions & 5 deletions dot/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type TaskResult struct {
Completed bool
Result Result
Error error
Retries uint
Retries int
Who peer.ID
}

Expand Down Expand Up @@ -82,15 +82,15 @@ type WorkerPool interface {
}

type WorkerPoolConfig struct {
Capacity uint
MaxRetries uint
Capacity int
MaxRetries int
}

// NewWorkerPool creates a new worker pool with the given configuration.
func NewWorkerPool(cfg WorkerPoolConfig) WorkerPool {
ctx, cancel := context.WithCancel(context.Background())

if cfg.Capacity == 0 {
if cfg.Capacity <= 0 {
cfg.Capacity = defaultWorkerPoolCapacity
}

Expand All @@ -108,7 +108,7 @@ type workerPool struct {
mtx sync.RWMutex
wg sync.WaitGroup

maxRetries uint
maxRetries int
peers list.List
ignoredPeers map[peer.ID]struct{}
statuses map[BatchID]BatchStatus
Expand Down
12 changes: 6 additions & 6 deletions dot/sync/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestWorkerPoolHappyPath(t *testing.T) {
result := <-wp.Results()
assert.True(t, result.Completed)
assert.False(t, result.Failed())
assert.Equal(t, uint(0), result.Retries)
assert.Equal(t, 0, result.Retries)

results = append(results, result)
if len(results) == numTasks {
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestWorkerPoolTaskFailures(t *testing.T) {
numTasks := 3
taskErr := errors.New("kaput")

setup := func(maxRetries uint) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) {
setup := func(maxRetries int) (failOnce *mockTask, failTwice *mockTask, batchID BatchID, wp WorkerPool) {
tasks, peers := makeTasksAndPeers(numTasks, 0)

failOnce = tasks[1].(*mockTask)
Expand Down Expand Up @@ -206,10 +206,10 @@ func TestWorkerPoolTaskFailures(t *testing.T) {
assert.Equal(t, 0, len(status.Failed))

assert.Nil(t, status.Failed[failOnce.ID()].Error)
assert.Equal(t, uint(1), status.Success[failOnce.ID()].Retries)
assert.Equal(t, 1, status.Success[failOnce.ID()].Retries)

assert.Nil(t, status.Failed[failTwice.ID()].Error)
assert.Equal(t, uint(2), status.Success[failTwice.ID()].Retries)
assert.Equal(t, 2, status.Success[failTwice.ID()].Retries)
})

t.Run("honours_max_retries", func(t *testing.T) {
Expand All @@ -223,10 +223,10 @@ func TestWorkerPoolTaskFailures(t *testing.T) {
assert.Equal(t, 1, len(status.Failed))

assert.Nil(t, status.Failed[failOnce.ID()].Error)
assert.Equal(t, uint(1), status.Success[failOnce.ID()].Retries)
assert.Equal(t, 1, status.Success[failOnce.ID()].Retries)

assert.ErrorIs(t, taskErr, status.Failed[failTwice.ID()].Error)
assert.Equal(t, uint(1), status.Failed[failTwice.ID()].Retries)
assert.Equal(t, 1, status.Failed[failTwice.ID()].Retries)
})
}

Expand Down

0 comments on commit 07358f2

Please sign in to comment.