Skip to content

Commit

Permalink
add a lock to protect latestProcessedBlock and db state
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Sep 23, 2024
1 parent b6670b5 commit c588232
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 31 deletions.
24 changes: 8 additions & 16 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const (

// RollupSyncService collects ScrollChain batch commit/revert/finalize events and stores metadata into db.
type RollupSyncService struct {
originalCtx context.Context
ctx context.Context
cancel context.CancelFunc
client *L1Client
Expand All @@ -66,8 +65,7 @@ type RollupSyncService struct {
bc *core.BlockChain
stack *node.Node

stateMu sync.Mutex // protects the service state
resetMu sync.Mutex // protects critical sections during reset operation
stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates
}

func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) {
Expand Down Expand Up @@ -107,7 +105,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
serviceCtx, cancel := context.WithCancel(ctx)

service := RollupSyncService{
originalCtx: ctx,
ctx: serviceCtx,
cancel: cancel,
client: client,
Expand Down Expand Up @@ -175,24 +172,19 @@ func (s *RollupSyncService) ResetToHeight(height uint64) {
return
}

s.resetMu.Lock()
defer s.resetMu.Unlock()

s.Stop()

newCtx, newCancel := context.WithCancel(s.originalCtx)
s.ctx = newCtx
s.cancel = newCancel
s.latestProcessedBlock = height
s.stateMu.Lock()
defer s.stateMu.Unlock()

rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height)
s.latestProcessedBlock = height

log.Info("Reset rollup sync service", "height", height)

go s.Start()
log.Info("Reset sync service", "height", height)
}

func (s *RollupSyncService) fetchRollupEvents() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestFinalizedBlockNumber()
if err != nil {
log.Warn("failed to get latest confirmed block number", "err", err)
Expand Down
22 changes: 7 additions & 15 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ var (

// SyncService collects all L1 messages and stores them in a local database.
type SyncService struct {
originalCtx context.Context
ctx context.Context
cancel context.CancelFunc
client *BridgeClient
Expand All @@ -53,8 +52,7 @@ type SyncService struct {
latestProcessedBlock uint64
scope event.SubscriptionScope

stateMu sync.Mutex // protects the service state
resetMu sync.Mutex // protects critical sections during reset operation
stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates
}

func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) {
Expand Down Expand Up @@ -84,7 +82,6 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node
serviceCtx, cancel := context.WithCancel(ctx)

service := SyncService{
originalCtx: ctx,
ctx: serviceCtx,
cancel: cancel,
client: client,
Expand Down Expand Up @@ -157,21 +154,13 @@ func (s *SyncService) ResetToHeight(height uint64) {
return
}

s.resetMu.Lock()
defer s.resetMu.Unlock()

s.Stop()

newCtx, newCancel := context.WithCancel(s.originalCtx)
s.ctx = newCtx
s.cancel = newCancel
s.latestProcessedBlock = height
s.stateMu.Lock()
defer s.stateMu.Unlock()

rawdb.WriteSyncedL1BlockNumber(s.db, height)
s.latestProcessedBlock = height

log.Info("Reset sync service", "height", height)

go s.Start()
}

// SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and
Expand All @@ -181,6 +170,9 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve
}

func (s *SyncService) fetchMessages() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err != nil {
log.Warn("Failed to get latest confirmed block number", "err", err)
Expand Down

0 comments on commit c588232

Please sign in to comment.