From 0a73818582bda1413c0132464dc22e9c145bb396 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Thu, 19 Sep 2024 14:11:34 +0300 Subject: [PATCH 1/4] feat(txpool): only try demoting txns from accounts that were active txpool demotes pending txns only if their nonce is now lower than the nonce in the latest state or the account no longer has enough funds to cover the costs. Unless the account in question was active since the last txpool reorg, there is no way that it's nonce changed or balance decreased. --- core/tx_pool.go | 41 +++++++++++++++++++++++++++++++---------- core/tx_pool_test.go | 2 +- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index e7d5062fd523..41b253e70227 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1308,9 +1308,10 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt promoteAddrs = dirtyAccounts.flatten() } pool.mu.Lock() + var affectedAccounts map[common.Address]bool if reset != nil { // Reset from the old head to the new, rescheduling any reorged transactions - pool.reset(reset.oldHead, reset.newHead) + affectedAccounts = pool.reset(reset.oldHead, reset.newHead) // Nonces were reset, discard any events that became stale for addr := range events { @@ -1332,7 +1333,7 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). if reset != nil { - pool.demoteUnexecutables() + pool.demoteUnexecutables(affectedAccounts) if reset.newHead != nil && pool.chainconfig.IsCurie(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { l1BaseFee := fees.GetL1BaseFee(pool.currentState) pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead, l1BaseFee) @@ -1380,9 +1381,18 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. -func (pool *TxPool) reset(oldHead, newHead *types.Header) { +func (pool *TxPool) reset(oldHead, newHead *types.Header) map[common.Address]bool { // If we're reorging an old state, reinject all dropped transactions var reinject types.Transactions + affectedAccounts := make(map[common.Address]bool) + collectAffectedAccounts := func(txs types.Transactions) { + if affectedAccounts != nil { + for _, tx := range txs { + addr, _ := types.Sender(pool.signer, tx) + affectedAccounts[addr] = true + } + } + } if oldHead != nil && oldHead.Hash() != newHead.ParentHash { // If the reorg is too deep, avoid doing it (will happen during fast sync) @@ -1391,6 +1401,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) + affectedAccounts = nil // do a deep txPool reorg } else { // Reorg seems shallow enough to pull in all transactions into memory var discarded, included types.Transactions @@ -1407,7 +1418,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // If we reorged to a same or higher number, then it's not a case of setHead log.Warn("Transaction pool reset with missing oldhead", "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - return + return nil } // If the reorg ended up on a lower number, it's indicative of setHead being the cause log.Debug("Skipping transaction reset caused by setHead", @@ -1418,44 +1429,48 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return + return nil } } for add.NumberU64() > rem.NumberU64() { included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return + return nil } } for rem.Hash() != add.Hash() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return + return nil } included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return + return nil } } reinject = types.TxDifference(discarded, included) + collectAffectedAccounts(discarded) + collectAffectedAccounts(included) } } } // Initialize the internal state to the current head if newHead == nil { + affectedAccounts = nil newHead = pool.chain.CurrentBlock().Header() // Special case during testing } statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset txpool state", "err", err) - return + return nil } pool.currentState = statedb pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit + collectAffectedAccounts(pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()).Transactions()) // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) @@ -1472,6 +1487,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // Update current head pool.currentHead = next + return affectedAccounts } // promoteExecutables moves transactions that have become processable from the @@ -1706,9 +1722,14 @@ func (pool *TxPool) truncateQueue() { // Note: transactions are not marked as removed in the priced list because re-heaping // is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful // to trigger a re-heap is this function -func (pool *TxPool) demoteUnexecutables() { +func (pool *TxPool) demoteUnexecutables(affectedAccounts map[common.Address]bool) { + log.Info("Demoting unexecutable transactions", "affected", len(affectedAccounts)) // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { + if affectedAccounts != nil && !affectedAccounts[addr] { + continue + } + nonce := pool.currentState.GetNonce(addr) // Drop all transactions that are deemed too old (low nonce) diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 525deda66b9b..3d5172dc00d7 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -2463,7 +2463,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables() + pool.demoteUnexecutables(nil) } } From 3112cc14e5bf4562cd6015a3a5c557fb0965621b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Mon, 23 Sep 2024 08:42:35 +0300 Subject: [PATCH 2/4] remove pause/resume reorg --- core/tx_pool.go | 24 +----------------------- miner/scroll_worker.go | 7 ------- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 41b253e70227..281168114c9a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -280,7 +280,6 @@ type TxPool struct { queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop - reorgPauseCh chan bool // requests to pause scheduleReorgLoop realTxActivityShutdownCh chan struct{} wg sync.WaitGroup // tracks loop, scheduleReorgLoop initDoneCh chan struct{} // is closed once the pool is initialized (for tests) @@ -317,7 +316,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block reorgDoneCh: make(chan chan struct{}), reorgShutdownCh: make(chan struct{}), realTxActivityShutdownCh: make(chan struct{}), - reorgPauseCh: make(chan bool), initDoneCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } @@ -1229,14 +1227,13 @@ func (pool *TxPool) scheduleReorgLoop() { curDone chan struct{} // non-nil while runReorg is active nextDone = make(chan struct{}) launchNextRun bool - reorgsPaused bool reset *txpoolResetRequest dirtyAccounts *accountSet queuedEvents = make(map[common.Address]*txSortedMap) ) for { // Launch next background reorg if needed - if curDone == nil && launchNextRun && !reorgsPaused { + if curDone == nil && launchNextRun { // Run the background reorg and announcements go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) @@ -1288,7 +1285,6 @@ func (pool *TxPool) scheduleReorgLoop() { } close(nextDone) return - case reorgsPaused = <-pool.reorgPauseCh: } } } @@ -1793,24 +1789,6 @@ func (pool *TxPool) calculateTxsLifecycle(txs types.Transactions, t time.Time) { } } -// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight -// Keep in mind this function might block, although it is not expected to block for any significant amount of time -func (pool *TxPool) PauseReorgs() { - select { - case pool.reorgPauseCh <- true: - case <-pool.reorgShutdownCh: - } -} - -// ResumeReorgs allows new reorg jobs to be started. -// Keep in mind this function might block, although it is not expected to block for any significant amount of time -func (pool *TxPool) ResumeReorgs() { - select { - case pool.reorgPauseCh <- false: - case <-pool.reorgShutdownCh: - } -} - // addressByHeartbeat is an account address tagged with its last activity timestamp. type addressByHeartbeat struct { address common.Address diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go index 02f331bc200a..8f7c76c69025 100644 --- a/miner/scroll_worker.go +++ b/miner/scroll_worker.go @@ -569,9 +569,6 @@ func (w *worker) processTxPool() (bool, error) { // Fill the block with all available pending transactions. pending := w.eth.TxPool().PendingWithMax(false, w.config.MaxAccountsNum) - // Allow txpool to be reorged as we build current block - w.eth.TxPool().ResumeReorgs() - // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending for _, account := range w.eth.TxPool().Locals() { @@ -892,10 +889,6 @@ func (w *worker) commit() (common.Hash, error) { } } - // A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block. - // We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check - w.eth.TxPool().PauseReorgs() - // Commit block and state to database. _, err = w.chain.WriteBlockWithState(block, w.current.receipts, w.current.coalescedLogs, w.current.state, true) if err != nil { From 83edfe1319bcb93460f200a404a3ebb8a9223db5 Mon Sep 17 00:00:00 2001 From: omerfirmak Date: Mon, 23 Sep 2024 05:46:39 +0000 Subject: [PATCH 3/4] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[bot?= =?UTF-8?q?]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index dec887edfcb2..47191375efc0 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 7 // Minor version component of the current release - VersionPatch = 21 // Patch version component of the current release + VersionPatch = 22 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) From 5a750da861de09a281d00ad1f249c1c49ec8de16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Tue, 24 Sep 2024 10:29:33 +0300 Subject: [PATCH 4/4] discarded->reinject --- core/tx_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 281168114c9a..28943e009629 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1448,7 +1448,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) map[common.Address]boo } } reinject = types.TxDifference(discarded, included) - collectAffectedAccounts(discarded) + collectAffectedAccounts(reinject) collectAffectedAccounts(included) } }