diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index f90aa2695e63..d651379debe7 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -176,13 +176,13 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) + initialProcessedBlock := s.latestProcessedBlock.Load() + currentProcessedBlock := initialProcessedBlock - latestProcessedBlock := s.latestProcessedBlock.Load() - updatedLatestProcessedBlock := latestProcessedBlock + log.Trace("Sync service fetch rollup events", "latest processed block", currentProcessedBlock, "latest confirmed", latestConfirmed) // query in batches - for from := latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := currentProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -204,10 +204,10 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - updatedLatestProcessedBlock = to + currentProcessedBlock = to } - s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) } func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endBlockNumber uint64) error { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 9fc50d68a9c7..45854c8e9d3c 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -164,10 +164,10 @@ func (s *SyncService) fetchMessages() { return } - latestProcessedBlock := s.latestProcessedBlock.Load() - updatedLatestProcessedBlock := latestProcessedBlock + initialProcessedBlock := s.latestProcessedBlock.Load() + currentProcessedBlock := initialProcessedBlock - log.Trace("Sync service fetchMessages", "latestProcessedBlock", latestProcessedBlock, "latestConfirmed", latestConfirmed) + log.Trace("Sync service fetchMessages", "latestProcessedBlock", currentProcessedBlock, "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -197,7 +197,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - updatedLatestProcessedBlock = lastBlock + currentProcessedBlock = lastBlock } // ticker for logging progress @@ -205,7 +205,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := updatedLatestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := currentProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -214,8 +214,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(updatedLatestProcessedBlock) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", updatedLatestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(currentProcessedBlock) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", currentProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: } @@ -260,5 +260,5 @@ func (s *SyncService) fetchMessages() { } } - s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) }