diff --git a/dot/sync/service.go b/dot/sync/service.go index 57814e6025..dec450a56f 100644 --- a/dot/sync/service.go +++ b/dot/sync/service.go @@ -279,21 +279,23 @@ func (s *SyncService) runStrategy() { bestBlockHeader.Hash().Short(), ) - tasks, err := s.currentStrategy.NextActions() - if err != nil { - logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) - return - } + if s.workerPool.Capacity() > s.currentStrategy.NumOfTasks() { + tasks, err := s.currentStrategy.NextActions() + if err != nil { + logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) + return + } - logger.Tracef("amount of tasks to process: %d", len(tasks)) - if len(tasks) == 0 { - return - } + logger.Tracef("amount of tasks to process: %d", len(tasks)) + if len(tasks) == 0 { + return + } - _, err = s.workerPool.SubmitBatch(tasks) - if err != nil { - logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) - return + _, err = s.workerPool.SubmitBatch(tasks) + if err != nil { + logger.Criticalf("current sync strategy next actions failed with: %s", err.Error()) + return + } } done, repChanges, peersToIgnore, err := s.currentStrategy.Process(s.workerPool.Results()) diff --git a/dot/sync/worker_pool.go b/dot/sync/worker_pool.go index c406f054d4..b3f2371098 100644 --- a/dot/sync/worker_pool.go +++ b/dot/sync/worker_pool.go @@ -74,6 +74,7 @@ type WorkerPool interface { SubmitBatch(tasks []Task) (id BatchID, err error) GetBatch(id BatchID) (status BatchStatus, ok bool) Results() chan TaskResult + Capacity() int AddPeer(p peer.ID) error RemovePeer(p peer.ID) IgnorePeer(p peer.ID) @@ -119,10 +120,6 @@ type workerPool struct { // SubmitBatch accepts a list of tasks and immediately returns a batch ID. The batch ID can be used to query the status // of the batch using [GetBatchStatus]. -// TODO -// If tasks are submitted faster than they are completed, resChan will run full, blocking the calling goroutine. -// Ideally this method would provide backpressure to the caller in that case. The rejected tasks should then stay in -// FullSyncStrategy.requestQueue until the next round. But this would need to be supported in all sync strategies. func (w *workerPool) SubmitBatch(tasks []Task) (id BatchID, err error) { w.mtx.Lock() defer w.mtx.Unlock() @@ -157,6 +154,13 @@ func (w *workerPool) Results() chan TaskResult { return w.resChan } +// Capacity returns the number of tasks the worker pool can currently accept. +func (w *workerPool) Capacity() int { + w.mtx.RLock() + defer w.mtx.RUnlock() + return len(w.resChan) +} + // AddPeer adds a peer to the worker pool unless it has been ignored previously. func (w *workerPool) AddPeer(who peer.ID) error { w.mtx.Lock()