From c93e97908d84cac6b85481915efd04060b642879 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 27 Oct 2023 12:09:54 +1000 Subject: [PATCH 01/20] error fix: right tx position.BlockOffset --- gossip/c_block_callbacks.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index d4fe3fea2..e463bf3d4 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -319,7 +319,6 @@ func consensusCallbackBeginBlockFn( for _, e := range blockEvents { txs = append(txs, e.Txs()...) } - _ = evmProcessor.Execute(txs) evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() @@ -345,12 +344,12 @@ func consensusCallbackBeginBlockFn( } } // memorize block position of each tx - for i, tx := range evmBlock.Transactions { + for _, r := range allReceipts { // not skipped txs only - position := txPositions[tx.Hash()] + position := txPositions[r.TxHash] position.Block = blockCtx.Idx - position.BlockOffset = uint32(i) - txPositions[tx.Hash()] = position + position.BlockOffset = uint32(r.TransactionIndex) + txPositions[r.TxHash] = position } // call OnNewReceipt From ca803a4bbb1fdb91762b4ae3f513e589ddd4a750 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 27 Oct 2023 14:35:48 +1000 Subject: [PATCH 02/20] data migration to fix db --- gossip/store_migration.go | 56 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/gossip/store_migration.go b/gossip/store_migration.go index 5c46b0e88..f310f978a 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -3,11 +3,15 @@ package gossip import ( "errors" "fmt" + "time" "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/Fantom-foundation/go-opera/gossip/evmstore" "github.com/Fantom-foundation/go-opera/inter" "github.com/Fantom-foundation/go-opera/inter/iblockproc" "github.com/Fantom-foundation/go-opera/utils/migration" @@ -28,6 +32,9 @@ func (s *Store) migrateData() error { } err := s.migrations().Exec(versions, s.flushDBs) + if err != nil { + panic(err) + } return err } @@ -43,7 +50,8 @@ func (s *Store) migrations() *migration.Migration { Next("erase gossip-async db", s.eraseGossipAsyncDB). Next("erase SFC API table", s.eraseSfcApiTable). Next("erase legacy genesis DB", s.eraseGenesisDB). - Next("calculate upgrade heights", s.calculateUpgradeHeights) + Next("calculate upgrade heights", s.calculateUpgradeHeights). + Next("EVM TxPosition.BlockOffset fix", s.fixTxPositionBlockOffset) } func unsupportedMigration() error { @@ -141,3 +149,49 @@ func (s *Store) calculateUpgradeHeights() error { } return nil } + +func (s *Store) fixTxPositionBlockOffset() error { + var ( + start = time.Now() + processed = 0 + ) + + receiptsTable, _ := s.dbs.OpenDB("evm/r") + txPositionsTable, _ := s.dbs.OpenDB("evm/x") + + it := receiptsTable.NewIterator(nil, nil) + defer it.Release() + for it.Next() { + + var receiptsStorage []*types.ReceiptForStorage + err := rlp.DecodeBytes(it.Value(), &receiptsStorage) + if err != nil { + s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) + } + + var pos = new(evmstore.TxPosition) + + for _, r := range receiptsStorage { + processed++ + key := r.TxHash.Bytes() + + got := s.rlp.Get(txPositionsTable, key, pos) + if got == nil { + continue + } + pos.BlockOffset = uint32(r.TransactionIndex) + s.rlp.Set(txPositionsTable, key, pos) + + if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 { + err = s.flushDBs() + if err != nil { + return err + } + s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed) + } + } + } + s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed) + + return nil +} From 2ddbac41ff7abc4b8fd7a02ed6cc01c37d82505a Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 28 Oct 2023 20:17:20 +1000 Subject: [PATCH 03/20] optimization: union the same loops --- gossip/c_block_callbacks.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index e463bf3d4..cf026f043 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -343,18 +343,16 @@ func consensusCallbackBeginBlockFn( } } } - // memorize block position of each tx - for _, r := range allReceipts { - // not skipped txs only + + for i, r := range allReceipts { + // memorize block position for not skipped txs only position := txPositions[r.TxHash] position.Block = blockCtx.Idx position.BlockOffset = uint32(r.TransactionIndex) txPositions[r.TxHash] = position - } - - // call OnNewReceipt - for i, r := range allReceipts { - creator := txPositions[r.TxHash].EventCreator + // call OnNewReceipt + creator := position.EventCreator + // TODO: is it check necessary? if creator != 0 && es.Validators.Get(creator) == 0 { creator = 0 } From 19c852c2f75a55a3f0abed20420b805aaaab32ad Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 28 Oct 2023 23:04:24 +1000 Subject: [PATCH 04/20] parallelized EVM TxPosition.BlockOffset migration Txs positions processing blocks=37776408 items=262,972,464 no thread: elapsed=50m0.817s 10 threads: elapsed=42m43.202s 100 threads: elapsed=42m37.924s --- gossip/store_migration.go | 79 ++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 25 deletions(-) diff --git a/gossip/store_migration.go b/gossip/store_migration.go index f310f978a..b5b7ea213 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -3,9 +3,12 @@ package gossip import ( "errors" "fmt" + "sync" + "sync/atomic" "time" "github.com/Fantom-foundation/lachesis-base/hash" + "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -150,48 +153,74 @@ func (s *Store) calculateUpgradeHeights() error { return nil } -func (s *Store) fixTxPositionBlockOffset() error { +func (s *Store) fixTxPositionBlockOffset() (err error) { + const parallels = 10 var ( - start = time.Now() - processed = 0 + wg sync.WaitGroup + items = new(uint32) ) receiptsTable, _ := s.dbs.OpenDB("evm/r") txPositionsTable, _ := s.dbs.OpenDB("evm/x") + processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) { + defer wg.Done() + pos := new(evmstore.TxPosition) + for rr := range input { + for _, r := range rr { + key := r.TxHash.Bytes() + got := s.rlp.Get(txPositionsTable, key, pos) + if got == nil { + continue + } + pos.BlockOffset = uint32(r.TransactionIndex) + s.rlp.Set(txPositionsTable, key, pos) + + atomic.AddUint32(items, 1) + } + } + } + + wg.Add(parallels) + threads := make([]chan []*types.ReceiptForStorage, parallels) + for i := range threads { + threads[i] = make(chan []*types.ReceiptForStorage, 10) + go processBlockReceipts(threads[i]) + } + + var ( + block idx.Block + start = time.Now() + prevFlushTime = time.Now() + ) it := receiptsTable.NewIterator(nil, nil) defer it.Release() - for it.Next() { + for n := 0; it.Next(); n++ { + block = idx.BytesToBlock(it.Key()) var receiptsStorage []*types.ReceiptForStorage err := rlp.DecodeBytes(it.Value(), &receiptsStorage) if err != nil { s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) } + threads[n%parallels] <- receiptsStorage - var pos = new(evmstore.TxPosition) - - for _, r := range receiptsStorage { - processed++ - key := r.TxHash.Bytes() - - got := s.rlp.Get(txPositionsTable, key, pos) - if got == nil { - continue - } - pos.BlockOffset = uint32(r.TransactionIndex) - s.rlp.Set(txPositionsTable, key, pos) - - if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 { - err = s.flushDBs() - if err != nil { - return err - } - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed) + if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod { + prevFlushTime = time.Now() + err = s.flushDBs() + if err != nil { + break } + s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", atomic.LoadUint32(items)) } } - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed) + for i := range threads { + close(threads[i]) + } + wg.Wait() + // no need to flush dbs at end as it migration engine does - return nil + s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", *items) + + return } From f89d9bf40ca7b9defbff7caa9afe86a0ac3fea02 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 31 Oct 2023 13:30:08 +1000 Subject: [PATCH 05/20] refactor: processBlocksRange() goroutine --- gossip/store_migration.go | 97 ++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 37 deletions(-) diff --git a/gossip/store_migration.go b/gossip/store_migration.go index b5b7ea213..e7dce9bbd 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -8,7 +8,6 @@ import ( "time" "github.com/Fantom-foundation/lachesis-base/hash" - "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -154,15 +153,14 @@ func (s *Store) calculateUpgradeHeights() error { } func (s *Store) fixTxPositionBlockOffset() (err error) { - const parallels = 10 + receiptsTable, _ := s.dbs.OpenDB("evm/r") + txPositionsTable, _ := s.dbs.OpenDB("evm/x") + + // for each block's receipts var ( wg sync.WaitGroup items = new(uint32) ) - - receiptsTable, _ := s.dbs.OpenDB("evm/r") - txPositionsTable, _ := s.dbs.OpenDB("evm/x") - processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) { defer wg.Done() pos := new(evmstore.TxPosition) @@ -181,46 +179,71 @@ func (s *Store) fixTxPositionBlockOffset() (err error) { } } - wg.Add(parallels) - threads := make([]chan []*types.ReceiptForStorage, parallels) - for i := range threads { - threads[i] = make(chan []*types.ReceiptForStorage, 10) - go processBlockReceipts(threads[i]) - } - + // for each block var ( - block idx.Block - start = time.Now() - prevFlushTime = time.Now() + blocks = new(uint32) ) - it := receiptsTable.NewIterator(nil, nil) - defer it.Release() - for n := 0; it.Next(); n++ { - block = idx.BytesToBlock(it.Key()) - - var receiptsStorage []*types.ReceiptForStorage - err := rlp.DecodeBytes(it.Value(), &receiptsStorage) - if err != nil { - s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) + processBlocksRange := func() { + defer wg.Done() + const ( + parallels = 10 + ) + wg.Add(parallels) + threads := make([]chan []*types.ReceiptForStorage, parallels) + for i := range threads { + threads[i] = make(chan []*types.ReceiptForStorage, 10) + go processBlockReceipts(threads[i]) } - threads[n%parallels] <- receiptsStorage - if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod { - prevFlushTime = time.Now() - err = s.flushDBs() + it := receiptsTable.NewIterator(nil, nil) + defer it.Release() + for n := 0; it.Next(); n++ { + atomic.AddUint32(blocks, 1) + + var receiptsStorage []*types.ReceiptForStorage + err := rlp.DecodeBytes(it.Value(), &receiptsStorage) if err != nil { - break + s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) } - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", atomic.LoadUint32(items)) + threads[n%parallels] <- receiptsStorage + } + for i := range threads { + close(threads[i]) } } - for i := range threads { - close(threads[i]) - } - wg.Wait() - // no need to flush dbs at end as it migration engine does - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", *items) + // status log + var ( + done = make(chan struct{}) + ) + go func() { + var ( + start = time.Now() + prevFlushTime = time.Now() + ) + for again := true; again; { + select { + case <-time.After(s.cfg.MaxNonFlushedPeriod / 5): + again = true + case <-done: + again = false + } + s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "blocks", atomic.LoadUint32(blocks), "items", atomic.LoadUint32(items)) + if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod { + prevFlushTime = time.Now() + err = s.flushDBs() + if err != nil { + break + } + } + } + }() + + // main start + wg.Add(1) + go processBlocksRange() + wg.Wait() + close(done) return } From 8221efab517a3ac0742fb2d9988077ffd630ad54 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 31 Oct 2023 17:00:01 +1000 Subject: [PATCH 06/20] parallel processBlocksRange() run --- gossip/store_migration.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/gossip/store_migration.go b/gossip/store_migration.go index e7dce9bbd..3d94ea824 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Fantom-foundation/lachesis-base/hash" + "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -153,6 +154,9 @@ func (s *Store) calculateUpgradeHeights() error { } func (s *Store) fixTxPositionBlockOffset() (err error) { + const ( + parallels = 10 + ) receiptsTable, _ := s.dbs.OpenDB("evm/r") txPositionsTable, _ := s.dbs.OpenDB("evm/x") @@ -183,11 +187,8 @@ func (s *Store) fixTxPositionBlockOffset() (err error) { var ( blocks = new(uint32) ) - processBlocksRange := func() { + processBlocksRange := func(from, to idx.Block) { defer wg.Done() - const ( - parallels = 10 - ) wg.Add(parallels) threads := make([]chan []*types.ReceiptForStorage, parallels) for i := range threads { @@ -195,9 +196,12 @@ func (s *Store) fixTxPositionBlockOffset() (err error) { go processBlockReceipts(threads[i]) } - it := receiptsTable.NewIterator(nil, nil) + it := receiptsTable.NewIterator(nil, from.Bytes()) defer it.Release() for n := 0; it.Next(); n++ { + if idx.BytesToBlock(it.Key()) > to { + break + } atomic.AddUint32(blocks, 1) var receiptsStorage []*types.ReceiptForStorage @@ -216,6 +220,7 @@ func (s *Store) fixTxPositionBlockOffset() (err error) { var ( done = make(chan struct{}) ) + defer close(done) go func() { var ( start = time.Now() @@ -239,11 +244,25 @@ func (s *Store) fixTxPositionBlockOffset() (err error) { } }() + // params + firstBlock := func() idx.Block { + it := receiptsTable.NewIterator(nil, nil) + defer it.Release() + if it.Next() { + return idx.BytesToBlock(it.Key()) + } + return 0 + }() + lastBlock := s.GetBlockState().LastBlock.Idx + // main start - wg.Add(1) - go processBlocksRange() + s.Log.Debug("processBlocksRange", "from", firstBlock, "to", lastBlock) + step := (lastBlock - firstBlock) / parallels + wg.Add(parallels + 1) + for i := idx.Block(0); i <= parallels; i++ { + go processBlocksRange(firstBlock+i*step, firstBlock+(i+1)*step-1) + } wg.Wait() - close(done) return } From fb950175a93699a892847d143f67c6678c5d1bfb Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 10 Nov 2023 13:25:46 +1000 Subject: [PATCH 07/20] dummyTxPool.Map() fix --- gossip/dummy_tx_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/dummy_tx_pool.go b/gossip/dummy_tx_pool.go index b9397ceeb..df29d1144 100644 --- a/gossip/dummy_tx_pool.go +++ b/gossip/dummy_tx_pool.go @@ -87,7 +87,7 @@ func (p *dummyTxPool) Map() map[common.Hash]*types.Transaction { for _, tx := range p.pool { res[tx.Hash()] = tx } - return nil + return res } func (p *dummyTxPool) Get(txid common.Hash) *types.Transaction { From 68acfd407e4a78074eeae82f0d56ada064b15ada Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 11 Nov 2023 00:55:38 +1000 Subject: [PATCH 08/20] dummy txpool fix --- gossip/dummy_tx_pool.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gossip/dummy_tx_pool.go b/gossip/dummy_tx_pool.go index df29d1144..f1bdb2280 100644 --- a/gossip/dummy_tx_pool.go +++ b/gossip/dummy_tx_pool.go @@ -159,6 +159,10 @@ func (p *dummyTxPool) Clear() { func (p *dummyTxPool) Delete(needle common.Hash) { p.lock.Lock() defer p.lock.Unlock() + + if len(p.pool) < 1 { + return + } notErased := make([]*types.Transaction, 0, len(p.pool)-1) for _, tx := range p.pool { if tx.Hash() != needle { From 1e1f59eb361891c3202e9f7005fe8241735578bd Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 12 Nov 2023 14:02:54 +1000 Subject: [PATCH 09/20] simpler dummy txpool --- gossip/common_test.go | 6 +- gossip/dummy_tx_pool.go | 211 ++++++++++++++++++++-------------------- 2 files changed, 108 insertions(+), 109 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index 5d2c4b82c..ba8b472bd 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -155,14 +155,16 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator) *testEnv { engine, vecClock := makeTestEngine(store) // create the service - txPool := &dummyTxPool{} + txPool := newDummyTxPool() env.Service, err = newService(DefaultConfig(cachescale.Identity), store, blockProc, engine, vecClock, func(_ evmcore.StateReader) TxPool { return txPool }) if err != nil { panic(err) } - txPool.signer = env.EthAPI.signer + + txPool.Signer = env.Service.EthAPI.signer + err = engine.Bootstrap(env.GetConsensusCallbacks()) if err != nil { panic(err) diff --git a/gossip/dummy_tx_pool.go b/gossip/dummy_tx_pool.go index f1bdb2280..763d6cca3 100644 --- a/gossip/dummy_tx_pool.go +++ b/gossip/dummy_tx_pool.go @@ -1,7 +1,6 @@ package gossip import ( - "math/rand" "sort" "sync" @@ -14,13 +13,17 @@ import ( // dummyTxPool is a fake, helper transaction pool for testing purposes type dummyTxPool struct { - txFeed notify.Feed - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + lock sync.RWMutex // Protects the transaction pool + Signer types.Signer - signer types.Signer + index map[common.Hash]common.Address + pending map[common.Address]types.Transactions +} - lock sync.RWMutex // Protects the transaction pool +func newDummyTxPool() *dummyTxPool { + p := new(dummyTxPool) + p.Clear() + return p } // AddRemotes appends a batch of transactions to the pool, and notifies any @@ -29,35 +32,37 @@ func (p *dummyTxPool) AddRemotes(txs []*types.Transaction) []error { p.lock.Lock() defer p.lock.Unlock() - p.pool = append(p.pool, txs...) - if p.added != nil { - p.added <- txs + errs := make([]error, 0, len(txs)) + for _, tx := range txs { + txid := tx.Hash() + if _, ok := p.index[txid]; ok { + continue + } + from, err := types.Sender(p.Signer, tx) + if err == nil { + p.index[txid] = from + p.pending[from] = append(p.pending[from], tx) + sort.Sort(types.TxByNonce(p.pending[from])) + } + errs = append(errs, err) } - return make([]error, len(txs)) -} - -func (p *dummyTxPool) AddLocals(txs []*types.Transaction) []error { - return p.AddRemotes(txs) + return errs } -func (p *dummyTxPool) AddLocal(tx *types.Transaction) error { - return p.AddLocals([]*types.Transaction{tx})[0] -} - -func (p *dummyTxPool) Nonce(addr common.Address) uint64 { - return 0 +func (p *dummyTxPool) Count() int { + return len(p.index) } func (p *dummyTxPool) Stats() (int, int) { return p.Count(), 0 } -func (p *dummyTxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { - return nil, nil -} +func (p *dummyTxPool) Has(txid common.Hash) bool { + p.lock.RLock() + defer p.lock.RUnlock() -func (p *dummyTxPool) ContentFrom(addr common.Address) (types.Transactions, types.Transactions) { - return nil, nil + _, ok := p.index[txid] + return ok } // Pending returns all the transactions known to the pool @@ -65,109 +70,101 @@ func (p *dummyTxPool) Pending(enforceTips bool) (map[common.Address]types.Transa p.lock.RLock() defer p.lock.RUnlock() - batches := make(map[common.Address]types.Transactions) - for _, tx := range p.pool { - from, _ := types.Sender(p.signer, tx) - batches[from] = append(batches[from], tx) - } - for _, batch := range batches { - sort.Sort(types.TxByNonce(batch)) + return p.clonePending(), nil +} + +func (p *dummyTxPool) clonePending() map[common.Address]types.Transactions { + clone := make(map[common.Address]types.Transactions, len(p.pending)) + + for from, txs := range p.pending { + tt := make(types.Transactions, len(txs)) + for i, tx := range txs { + tt[i] = tx + } + clone[from] = tt } - return batches, nil + + return clone } -func (p *dummyTxPool) SubscribeNewTxsNotify(ch chan<- evmcore.NewTxsNotify) notify.Subscription { - return p.txFeed.Subscribe(ch) +func (p *dummyTxPool) Clear() { + p.lock.Lock() + defer p.lock.Unlock() + + p.index = make(map[common.Hash]common.Address) + p.pending = make(map[common.Address]types.Transactions) } -func (p *dummyTxPool) Map() map[common.Hash]*types.Transaction { - p.lock.RLock() - defer p.lock.RUnlock() - res := make(map[common.Hash]*types.Transaction, len(p.pool)) - for _, tx := range p.pool { - res[tx.Hash()] = tx +func (p *dummyTxPool) Delete(needle common.Hash) { + p.lock.Lock() + defer p.lock.Unlock() + + from, ok := p.index[needle] + if !ok { + return } - return res -} -func (p *dummyTxPool) Get(txid common.Hash) *types.Transaction { - p.lock.RLock() - defer p.lock.RUnlock() - for _, tx := range p.pool { - if tx.Hash() == txid { - return tx + txs := p.pending[from] + if len(txs) < 2 { + delete(p.pending, from) + } else { + notErased := make(types.Transactions, 0, len(txs)-1) + for _, tx := range txs { + if tx.Hash() != needle { + notErased = append(notErased, tx) + } } + p.pending[from] = notErased } + delete(p.index, needle) +} + +func (p *dummyTxPool) AddLocals(txs []*types.Transaction) []error { + panic("is not implemented") return nil } -func (p *dummyTxPool) Has(txid common.Hash) bool { - p.lock.RLock() - defer p.lock.RUnlock() - for _, tx := range p.pool { - if tx.Hash() == txid { - return true - } - } - return false +func (p *dummyTxPool) AddLocal(tx *types.Transaction) error { + panic("is not implemented") + return nil } -func (p *dummyTxPool) OnlyNotExisting(txids []common.Hash) []common.Hash { - m := p.Map() - notExisting := make([]common.Hash, 0, len(txids)) - for _, txid := range txids { - if m[txid] == nil { - notExisting = append(notExisting, txid) - } - } - return notExisting +func (p *dummyTxPool) Nonce(addr common.Address) uint64 { + panic("is not implemented") + return 0 } -func (p *dummyTxPool) SampleHashes(max int) []common.Hash { - p.lock.RLock() - defer p.lock.RUnlock() - res := make([]common.Hash, 0, max) - skip := 0 - if len(p.pool) > max { - skip = rand.Intn(len(p.pool) - max) - } - for _, tx := range p.pool { - if len(res) >= max { - break - } - if skip > 0 { - skip-- - continue - } - res = append(res, tx.Hash()) - } - return res +func (p *dummyTxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { + panic("is not implemented") + return nil, nil } -func (p *dummyTxPool) Count() int { - return len(p.pool) +func (p *dummyTxPool) ContentFrom(addr common.Address) (types.Transactions, types.Transactions) { + panic("is not implemented") + return nil, nil } -func (p *dummyTxPool) Clear() { - p.lock.Lock() - defer p.lock.Unlock() - if len(p.pool) != 0 { - p.pool = p.pool[:0] - } +func (p *dummyTxPool) SubscribeNewTxsNotify(ch chan<- evmcore.NewTxsNotify) notify.Subscription { + panic("is not implemented") + return nil } -func (p *dummyTxPool) Delete(needle common.Hash) { - p.lock.Lock() - defer p.lock.Unlock() +func (p *dummyTxPool) Map() map[common.Hash]*types.Transaction { + panic("is not implemented") + return nil +} - if len(p.pool) < 1 { - return - } - notErased := make([]*types.Transaction, 0, len(p.pool)-1) - for _, tx := range p.pool { - if tx.Hash() != needle { - notErased = append(notErased, tx) - } - } - p.pool = notErased +func (p *dummyTxPool) Get(txid common.Hash) *types.Transaction { + panic("is not implemented") + return nil +} + +func (p *dummyTxPool) OnlyNotExisting(txids []common.Hash) []common.Hash { + panic("is not implemented") + return nil +} + +func (p *dummyTxPool) SampleHashes(max int) []common.Hash { + panic("is not implemented") + return nil } From bb28ed12896b13aa32aeb4e71da260bc4d135819 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 16 Nov 2023 21:14:01 +1000 Subject: [PATCH 10/20] testEnv.transactor fills tx --- gossip/common_test.go | 117 +++++++++++++++++++++++++++--------------- 1 file changed, 77 insertions(+), 40 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index ba8b472bd..ce9a5193c 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/Fantom-foundation/go-opera/ethapi" "github.com/Fantom-foundation/go-opera/evmcore" "github.com/Fantom-foundation/go-opera/gossip/blockproc" "github.com/Fantom-foundation/go-opera/gossip/emitter" @@ -40,8 +41,8 @@ import ( ) const ( - gasLimit = uint64(21000) - maxGasLimit = uint64(6000000) + gasPrice = uint64(1e12) + genesisBalance = 1e18 genesisStake = 2 * 4e6 @@ -60,8 +61,9 @@ type testEnv struct { nonces map[common.Address]uint64 callback callbacks *Service - signer valkeystore.SignerI - pubkeys []validatorpk.PubKey + transactor *ethapi.PublicTransactionPoolAPI + signer valkeystore.SignerI + pubkeys []validatorpk.PubKey } func panics(name string) func(error) { @@ -162,8 +164,8 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator) *testEnv { if err != nil { panic(err) } - txPool.Signer = env.Service.EthAPI.signer + env.transactor = ethapi.NewPublicTransactionPoolAPI(env.Service.EthAPI, new(ethapi.AddrLocker)) err = engine.Bootstrap(env.GetConsensusCallbacks()) if err != nil { @@ -308,13 +310,24 @@ func (env *testEnv) EmitUntil(stop func() bool) error { func (env *testEnv) Transfer(from, to idx.ValidatorID, amount *big.Int) *types.Transaction { sender := env.Address(from) - nonce, _ := env.PendingNonceAt(nil, sender) - env.incNonce(sender) - key := env.privateKey(from) + nonce := env.nextNonce(sender) receiver := env.Address(to) - gp := new(big.Int).SetUint64(1e12) - tx := types.NewTransaction(nonce, receiver, amount, gasLimit, gp, nil) - tx, err := types.SignTx(tx, env.EthAPI.signer, key) + gasLimit := env.GetEvmStateReader().MaxGasLimit() + + raw, err := env.transactor.FillTransaction(context.Background(), ethapi.TransactionArgs{ + From: &sender, + To: &receiver, + Value: (*hexutil.Big)(amount), + Nonce: (*hexutil.Uint64)(&nonce), + Gas: (*hexutil.Uint64)(&gasLimit), + GasPrice: (*hexutil.Big)(new(big.Int).SetUint64(gasPrice)), + }) + if err != nil { + panic(err) + } + + key := env.privateKey(from) + tx, err := types.SignTx(raw.Tx, env.EthAPI.signer, key) if err != nil { panic(err) } @@ -324,13 +337,24 @@ func (env *testEnv) Transfer(from, to idx.ValidatorID, amount *big.Int) *types.T func (env *testEnv) Contract(from idx.ValidatorID, amount *big.Int, hex string) *types.Transaction { sender := env.Address(from) - nonce, _ := env.PendingNonceAt(nil, sender) - env.incNonce(sender) - key := env.privateKey(from) - gp := env.store.GetRules().Economy.MinGasPrice + nonce := env.nextNonce(sender) + gasLimit := env.GetEvmStateReader().MaxGasLimit() data := hexutil.MustDecode(hex) - tx := types.NewContractCreation(nonce, amount, maxGasLimit, gp, data) - tx, err := types.SignTx(tx, env.EthAPI.signer, key) + + raw, err := env.transactor.FillTransaction(context.Background(), ethapi.TransactionArgs{ + From: &sender, + Value: (*hexutil.Big)(amount), + Nonce: (*hexutil.Uint64)(&nonce), + Gas: (*hexutil.Uint64)(&gasLimit), + GasPrice: (*hexutil.Big)(new(big.Int).SetUint64(gasPrice)), + Data: (*hexutil.Bytes)(&data), + }) + if err != nil { + panic(err) + } + + key := env.privateKey(from) + tx, err := types.SignTx(raw.Tx, env.EthAPI.signer, key) if err != nil { panic(err) } @@ -349,25 +373,40 @@ func (env *testEnv) Address(n idx.ValidatorID) common.Address { return addr } -func (env *testEnv) Payer(n idx.ValidatorID, amounts ...*big.Int) *bind.TransactOpts { - key := env.privateKey(n) - t, _ := bind.NewKeyedTransactorWithChainID(key, new(big.Int).SetUint64(env.store.GetRules().NetworkID)) - nonce, _ := env.PendingNonceAt(nil, env.Address(n)) - t.Nonce = big.NewInt(int64(nonce)) - t.Value = big.NewInt(0) - for _, amount := range amounts { - t.Value.Add(t.Value, amount) +func (env *testEnv) Pay(from idx.ValidatorID, amounts ...*big.Int) *bind.TransactOpts { + sender := env.Address(from) + nonce := env.nextNonce(sender) + amount := big.NewInt(0) + for _, a := range amounts { + amount.Add(amount, a) } - t.GasLimit = env.GetEvmStateReader().MaxGasLimit() - t.GasPrice = env.GetEvmStateReader().MinGasPrice() + gasLimit := env.GetEvmStateReader().MaxGasLimit() - return t -} + data := []byte{0} // fake -func (env *testEnv) Pay(n idx.ValidatorID, amounts ...*big.Int) *bind.TransactOpts { - t := env.Payer(n, amounts...) - env.incNonce(t.From) + raw, err := env.transactor.FillTransaction(context.Background(), ethapi.TransactionArgs{ + From: &sender, + Value: (*hexutil.Big)(amount), + Nonce: (*hexutil.Uint64)(&nonce), + Gas: (*hexutil.Uint64)(&gasLimit), + GasPrice: (*hexutil.Big)(new(big.Int).SetUint64(gasPrice)), + Data: (*hexutil.Bytes)(&data), + }) + if err != nil { + panic(err) + } + key := env.privateKey(from) + t, _ := bind.NewKeyedTransactorWithChainID(key, new(big.Int).SetUint64(env.store.GetRules().NetworkID)) + { + t.Nonce = big.NewInt(int64(nonce)) + t.Value = amount + t.NoSend = true + t.GasLimit = raw.Tx.Gas() + t.GasPrice = raw.Tx.GasPrice() + //t.GasFeeCap = raw.Tx.GasFeeCap() + //t.GasTipCap = raw.Tx.GasTipCap() + } return t } @@ -380,8 +419,10 @@ func (env *testEnv) State() *state.StateDB { return statedb } -func (env *testEnv) incNonce(account common.Address) { - env.nonces[account] += 1 +func (env *testEnv) nextNonce(account common.Address) uint64 { + nonce := env.nonces[account] + env.nonces[account] = nonce + 1 + return nonce } /* @@ -498,12 +539,8 @@ func (env *testEnv) SuggestGasPrice(ctx context.Context) (*big.Int, error) { // transactions may be added or removed by miners, but it should provide a basis // for setting a reasonable default. func (env *testEnv) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { - if call.To == nil { - gas = gasLimit * 10000 - } else { - gas = gasLimit * 10 - } - return + panic("is not implemented") + return 0, nil } // SendTransaction injects the transaction into the pending pool for execution. From 8edaeeaccdb45fc2d7585731920258acca2a1219 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 17 Nov 2023 14:52:50 +1000 Subject: [PATCH 11/20] testEnv.ApplyTxs() waits for exactly txs --- gossip/common_test.go | 109 ++++++++++++++++++++++++++++++++---------- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index ce9a5193c..381b3ce62 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -8,6 +8,7 @@ import ( "math" "math/big" "sync" + "sync/atomic" "time" "github.com/Fantom-foundation/lachesis-base/abft" @@ -105,7 +106,9 @@ func (em testEmitterWorldExternal) Build(e *inter.MutableEventPayload, onIndexed if em.env.callback.buildEvent != nil { em.env.callback.buildEvent(e) } - return em.External.Build(e, onIndexed) + err := em.External.Build(e, onIndexed) + + return err } func (em testEmitterWorldExternal) Broadcast(*inter.EventPayload) {} @@ -119,6 +122,7 @@ func (p testConfirmedEventsProcessor) ProcessConfirmedEvent(e inter.EventI) { if p.env.callback.onEventConfirmed != nil { p.env.callback.onEventConfirmed(e) } + p.ConfirmedEventsProcessor.ProcessConfirmedEvent(e) } @@ -219,41 +223,99 @@ func (env *testEnv) GetEvmStateReader() *EvmStateReader { func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { env.t = env.t.Add(spent) - externalReceipts := make(types.Receipts, 0, len(txs)) + env.callback.onEventConfirmed = func(e inter.EventI) { + for _, em := range env.emitters { + em.OnEventConfirmed(e) + } + } + + rest := int32(len(txs)) + waitFor := make(map[common.Hash]struct{}, len(txs)) + for _, tx := range txs { + + g, err := tx.EffectiveGasTip(env.store.GetRules().Economy.MinGasPrice) + + fmt.Printf("->> TX: %s, nonce: %d, g: %d, err: %s\n", tx.Hash().String(), tx.Nonce(), g, err) + waitFor[tx.Hash()] = struct{}{} + } + + receipts := make(types.Receipts, 0, len(txs)) + + wg := new(sync.WaitGroup) + wg.Add(1) + defer wg.Wait() - env.txpool.AddRemotes(txs) - defer env.txpool.(*dummyTxPool).Clear() newBlocks := make(chan evmcore.ChainHeadNotify) + defer close(newBlocks) chainHeadSub := env.feed.SubscribeNewBlock(newBlocks) - mu := &sync.Mutex{} + defer chainHeadSub.Unsubscribe() + go func() { + defer wg.Done() for b := range newBlocks { - if len(b.Block.Transactions) == 0 { - continue - } - receipts := env.store.evm.GetReceipts(idx.Block(b.Block.Number.Uint64()), env.EthAPI.signer, b.Block.Hash, b.Block.Transactions) - for i, tx := range b.Block.Transactions { - if r, _, _ := tx.RawSignatureValues(); r.Sign() != 0 { - mu.Lock() - externalReceipts = append(externalReceipts, receipts[i]) - mu.Unlock() - env.txpool.(*dummyTxPool).Delete(tx.Hash()) + + n := idx.Block(b.Block.Number.Uint64()) + // valid txs + if len(b.Block.Transactions) > 0 { + rr := env.store.evm.GetReceipts(n, env.EthAPI.signer, b.Block.Hash, b.Block.Transactions) + for _, r := range rr { + env.txpool.(*dummyTxPool).Delete(r.TxHash) + if _, ok := waitFor[r.TxHash]; ok { + fmt.Printf("<<- TX: %s, g: %d\n", r.TxHash.String(), r.GasUsed) + receipts = append(receipts, r) + delete(waitFor, r.TxHash) + atomic.AddInt32(&rest, -1) + } } } - if externalReceipts.Len() == len(txs) { - chainHeadSub.Unsubscribe() - close(newBlocks) - break + // invalid txs + block := env.store.GetBlock(n) + if len(block.SkippedTxs) > 0 { + var ( + internalsLen = uint32(len(block.InternalTxs)) + externalsLen = uint32(len(block.InternalTxs) + len(block.Txs)) + eventstxsLen = uint32(len(block.InternalTxs) + len(block.Txs) + 0) + e = 0 + ) + for _, txi := range block.SkippedTxs { + var tx common.Hash + + switch { + case txi < internalsLen: + tx = block.InternalTxs[txi+0] + case txi < externalsLen: + tx = block.Txs[txi-internalsLen] + default: + for { + etxs := env.store.GetEventPayload(block.Events[e]).Txs() + if txi < (eventstxsLen + uint32(len(etxs))) { + tx = etxs[txi-eventstxsLen].Hash() + break + } else { + e += 1 // next event + eventstxsLen += uint32(len(etxs)) + } + } + } + env.txpool.(*dummyTxPool).Delete(tx) + if _, ok := waitFor[tx]; ok { + fmt.Printf("<<- TX: %s, err: skipped\n", tx.String()) + delete(waitFor, tx) + atomic.AddInt32(&rest, -1) + } + } } } }() + + env.txpool.AddRemotes(txs) + defer env.txpool.(*dummyTxPool).Clear() + err := env.EmitUntil(func() bool { - mu.Lock() - defer mu.Unlock() - return externalReceipts.Len() == len(txs) + return atomic.LoadInt32(&rest) == 0 }) - return externalReceipts, err + return receipts, err } func (env *testEnv) ApplyMPs(spent time.Duration, mps ...inter.MisbehaviourProof) error { @@ -381,7 +443,6 @@ func (env *testEnv) Pay(from idx.ValidatorID, amounts ...*big.Int) *bind.Transac amount.Add(amount, a) } gasLimit := env.GetEvmStateReader().MaxGasLimit() - data := []byte{0} // fake raw, err := env.transactor.FillTransaction(context.Background(), ethapi.TransactionArgs{ From b033aafb4a5ca35bedf7d582047889bbd2f51763 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 17 Nov 2023 15:21:22 +1000 Subject: [PATCH 12/20] withLowGas() transact opts modificator --- gossip/common_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/gossip/common_test.go b/gossip/common_test.go index 381b3ce62..3817f5036 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -471,6 +471,32 @@ func (env *testEnv) Pay(from idx.ValidatorID, amounts ...*big.Int) *bind.Transac return t } +func withLowGas(opts *bind.TransactOpts) *bind.TransactOpts { + originSigner := opts.Signer + + opts.Signer = func(from common.Address, tx *types.Transaction) (*types.Transaction, error) { + gas, err := evmcore.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil) + if err != nil { + return nil, err + } + if tx.Gas() >= gas { + repack := &types.LegacyTx{ + To: tx.To(), + Nonce: tx.Nonce(), + GasPrice: tx.GasPrice(), + Gas: gas + 1, + Value: tx.Value(), + Data: tx.Data(), + } + tx = types.NewTx(repack) + } + return originSigner(from, tx) + } + + return opts + +} + func (env *testEnv) ReadOnly() *bind.CallOpts { return &bind.CallOpts{} } From 63be75134eab639b8a47f490c4f47791f5037430 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 17 Nov 2023 23:05:17 +1000 Subject: [PATCH 13/20] TestTxIndexing() stub --- gossip/txposition_test.go | 68 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 gossip/txposition_test.go diff --git a/gossip/txposition_test.go b/gossip/txposition_test.go new file mode 100644 index 000000000..2eb62f56e --- /dev/null +++ b/gossip/txposition_test.go @@ -0,0 +1,68 @@ +package gossip + +import ( + "fmt" + "math/big" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/Fantom-foundation/go-opera/gossip/contract/ballot" + "github.com/Fantom-foundation/go-opera/logger" +) + +func TestTxIndexing(t *testing.T) { + logger.SetTestMode(t) + logger.SetLevel("debug") + require := require.New(t) + + env := newTestEnv(2, 3) + defer env.Close() + + proposals := [][32]byte{ + ballotOption("Option 1"), + ballotOption("Option 2"), + ballotOption("Option 3"), + } + + // valid tx + _, tx1ok, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) + require.NoError(err) + require.NotNil(cBallot) + require.NotNil(tx1ok) + + // invalid tx + tx2reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) + require.NoError(err) + require.NotNil(tx2reverted) + + // valid tx + tx3ok, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) + require.NoError(err) + require.NotNil(tx3ok) + + // invalid tx + tx4skipped, err := cBallot.Vote(withLowGas(env.Pay(2)), big.NewInt(0)) + require.NoError(err) + require.NotNil(tx4skipped) + + // valid tx + tx5ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0)) + require.NoError(err) + require.NotNil(tx5ok) + + receipts, err := env.ApplyTxs(nextEpoch, + tx1ok, + tx2reverted, + tx3ok, + tx4skipped, + tx5ok, + ) + require.NoError(err) + + for _, r := range receipts { + fmt.Printf(">>>>>>>>> tx[%s] status %d\n", r.TxHash.String(), r.Status) + } + + require.Len(receipts, 0) +} From ff09e6e7684b5660868d1abf694cd25cf95132f4 Mon Sep 17 00:00:00 2001 From: alex Date: Wed, 29 Nov 2023 21:13:42 +1000 Subject: [PATCH 14/20] gossip/testEnv.BlockTxs() to put txs in the same block --- gossip/common_test.go | 116 ++++++++++++++++++++++++-------------- gossip/txposition_test.go | 60 +++++++++++--------- 2 files changed, 107 insertions(+), 69 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index 3817f5036..a05671f24 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -8,7 +8,6 @@ import ( "math" "math/big" "sync" - "sync/atomic" "time" "github.com/Fantom-foundation/lachesis-base/abft" @@ -111,7 +110,10 @@ func (em testEmitterWorldExternal) Build(e *inter.MutableEventPayload, onIndexed return err } -func (em testEmitterWorldExternal) Broadcast(*inter.EventPayload) {} +func (em testEmitterWorldExternal) Broadcast(emitted *inter.EventPayload) { + // PM listens and will broadcast it + em.env.feed.newEmittedEvent.Send(emitted) +} type testConfirmedEventsProcessor struct { blockproc.ConfirmedEventsProcessor @@ -199,6 +201,7 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator) *testEnv { env.RegisterEmitter(em) env.pubkeys = append(env.pubkeys, pubkey) em.Start() + em.Stop() // to control emitting manually } _ = env.store.GenerateSnapshotAt(common.Hash(store.GetBlockState().FinalizedStateRoot), false) @@ -221,35 +224,48 @@ func (env *testEnv) GetEvmStateReader() *EvmStateReader { } func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { - env.t = env.t.Add(spent) - - env.callback.onEventConfirmed = func(e inter.EventI) { - for _, em := range env.emitters { - em.OnEventConfirmed(e) - } - } + return env.applyTxs(spent, false, txs...) +} - rest := int32(len(txs)) - waitFor := make(map[common.Hash]struct{}, len(txs)) - for _, tx := range txs { +func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { + return env.applyTxs(spent, true, txs...) +} - g, err := tx.EffectiveGasTip(env.store.GetRules().Economy.MinGasPrice) +func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*types.Transaction) (types.Receipts, error) { + env.t = env.t.Add(spent) - fmt.Printf("->> TX: %s, nonce: %d, g: %d, err: %s\n", tx.Hash().String(), tx.Nonce(), g, err) - waitFor[tx.Hash()] = struct{}{} + waitForInEvents := make(map[common.Hash]struct{}, len(txs)) + waitForInBlocks := make(map[common.Hash]struct{}, len(txs)) + for _, tx := range txs { + waitForInEvents[tx.Hash()] = struct{}{} + waitForInBlocks[tx.Hash()] = struct{}{} } - receipts := make(types.Receipts, 0, len(txs)) - wg := new(sync.WaitGroup) - wg.Add(1) + wg.Add(2) defer wg.Wait() + newEvents := make(chan *inter.EventPayload) + defer close(newEvents) + eventsSub := env.feed.SubscribeNewEmitted(newEvents) + defer eventsSub.Unsubscribe() + go func() { + defer wg.Done() + for e := range newEvents { + for _, tx := range e.Txs() { + h := tx.Hash() + delete(waitForInEvents, h) + // env.txpool.(*dummyTxPool).Delete(tx.Hash()) + } + } + }() + + receipts := make(types.Receipts, 0, len(txs)) + newBlocks := make(chan evmcore.ChainHeadNotify) defer close(newBlocks) - chainHeadSub := env.feed.SubscribeNewBlock(newBlocks) - defer chainHeadSub.Unsubscribe() - + blocksSub := env.feed.SubscribeNewBlock(newBlocks) + defer blocksSub.Unsubscribe() go func() { defer wg.Done() for b := range newBlocks { @@ -260,11 +276,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty rr := env.store.evm.GetReceipts(n, env.EthAPI.signer, b.Block.Hash, b.Block.Transactions) for _, r := range rr { env.txpool.(*dummyTxPool).Delete(r.TxHash) - if _, ok := waitFor[r.TxHash]; ok { - fmt.Printf("<<- TX: %s, g: %d\n", r.TxHash.String(), r.GasUsed) + if _, ok := waitForInBlocks[r.TxHash]; ok { receipts = append(receipts, r) - delete(waitFor, r.TxHash) - atomic.AddInt32(&rest, -1) + delete(waitForInBlocks, r.TxHash) } } } @@ -298,22 +312,32 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty } } env.txpool.(*dummyTxPool).Delete(tx) - if _, ok := waitFor[tx]; ok { - fmt.Printf("<<- TX: %s, err: skipped\n", tx.String()) - delete(waitFor, tx) - atomic.AddInt32(&rest, -1) + if _, ok := waitForInBlocks[tx]; ok { + delete(waitForInBlocks, tx) } } } } }() + byEmitters := func() []*emitter.Emitter { + // datarace does not matter + if singleBlock && len(waitForInEvents) > 0 { + count := len(env.emitters) * 70 / 100 + // prevent block creation + return env.emitters[:count] + } + if len(waitForInBlocks) > 0 { + // allow block creation + return env.emitters + } + // ready to stop + return nil + } + env.txpool.AddRemotes(txs) defer env.txpool.(*dummyTxPool).Clear() - - err := env.EmitUntil(func() bool { - return atomic.LoadInt32(&rest) == 0 - }) + err := env.EmitUntil(byEmitters) return receipts, err } @@ -346,16 +370,24 @@ func (env *testEnv) ApplyMPs(spent time.Duration, mps ...inter.MisbehaviourProof env.callback.buildEvent = nil }() - return env.EmitUntil(func() bool { - return confirmed - }) -} + byEmitters := func() []*emitter.Emitter { + if !confirmed { + return env.emitters + } + return nil + } -func (env *testEnv) EmitUntil(stop func() bool) error { - t := time.Now() + return env.EmitUntil(byEmitters) +} - for !stop() { - for _, em := range env.emitters { +func (env *testEnv) EmitUntil(by func() []*emitter.Emitter) error { + start := time.Now() + for { + emitters := by() + if len(emitters) < 1 { + break + } + for _, em := range emitters { _, err := em.EmitEvent() if err != nil { return err @@ -363,7 +395,7 @@ func (env *testEnv) EmitUntil(stop func() bool) error { } env.WaitBlockEnd() env.t = env.t.Add(time.Second) - if time.Since(t) > 30*time.Second { + if time.Since(start) > 30*time.Second { panic("block doesn't get processed") } } diff --git a/gossip/txposition_test.go b/gossip/txposition_test.go index 2eb62f56e..e277d7071 100644 --- a/gossip/txposition_test.go +++ b/gossip/txposition_test.go @@ -5,6 +5,7 @@ import ( "math/big" "testing" + "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" "github.com/Fantom-foundation/go-opera/gossip/contract/ballot" @@ -25,44 +26,49 @@ func TestTxIndexing(t *testing.T) { ballotOption("Option 3"), } - // valid tx - _, tx1ok, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) + // preparing + _, tx1pre, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) require.NoError(err) require.NotNil(cBallot) - require.NotNil(tx1ok) - - // invalid tx - tx2reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) + require.NotNil(tx1pre) + tx2pre, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) require.NoError(err) - require.NotNil(tx2reverted) - - // valid tx - tx3ok, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) + require.NotNil(tx2pre) + receipts, err := env.BlockTxs(nextEpoch, + tx1pre, + tx2pre, + ) require.NoError(err) - require.NotNil(tx3ok) - + require.Len(receipts, 2) + for i, r := range receipts { + require.Equal(types.ReceiptStatusSuccessful, r.Status, i) + } // invalid tx - tx4skipped, err := cBallot.Vote(withLowGas(env.Pay(2)), big.NewInt(0)) + tx1reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) require.NoError(err) - require.NotNil(tx4skipped) - + require.NotNil(tx1reverted) // valid tx - tx5ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0)) + tx2ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0)) require.NoError(err) - require.NotNil(tx5ok) + require.NotNil(tx2ok) + // skipped tx + _, tx3skipped, _, err := ballot.DeployBallot(withLowGas(env.Pay(1)), env, proposals) + require.NoError(err) + require.NotNil(tx3skipped) - receipts, err := env.ApplyTxs(nextEpoch, - tx1ok, - tx2reverted, - tx3ok, - tx4skipped, - tx5ok, + receipts, err = env.BlockTxs(nextEpoch, + tx1reverted, + tx2ok, + tx3skipped, ) require.NoError(err) - - for _, r := range receipts { - fmt.Printf(">>>>>>>>> tx[%s] status %d\n", r.TxHash.String(), r.Status) + require.Len(receipts, 3) + var block *big.Int + for i, r := range receipts { + if block == nil { + block = r.BlockNumber + } + require.Equal(block.Uint64(), r.BlockNumber.Uint64(), i) } - require.Len(receipts, 0) } From f0b3bdd89a2e05f0eb2a8c5fcc382a2dd9b9d654 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 6 Feb 2024 22:58:35 +1000 Subject: [PATCH 15/20] another approach to gossip/testEnv.BlockTxs() --- gossip/common_test.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index a05671f24..0586c64cc 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -14,6 +14,7 @@ import ( "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/inter/dag" "github.com/Fantom-foundation/lachesis-base/inter/idx" + "github.com/Fantom-foundation/lachesis-base/lachesis" "github.com/Fantom-foundation/lachesis-base/utils/cachescale" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -228,7 +229,31 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty } func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { - return env.applyTxs(spent, true, txs...) + // Just `env.applyTxs(spent, true, txs...)` does not work guaranteed because of gas rules. + // So we make single-event block and skip emitting process. + env.t = env.t.Add(spent) + + mutEvent := &inter.MutableEventPayload{} + mutEvent.SetVersion(1) // LLR + mutEvent.SetEpoch(env.store.GetEpoch()) + mutEvent.SetCreationTime(inter.Timestamp(env.t.UnixNano())) + mutEvent.SetTxs(types.Transactions(txs)) + event := mutEvent.Build() + env.store.SetEvent(event) + + consensus := env.Service.GetConsensusCallbacks() + blockCallback := consensus.BeginBlock(&lachesis.Block{ + Atropos: event.ID(), + Cheaters: make([]idx.ValidatorID, 0), + }) + blockCallback.ApplyEvent(event) + blockCallback.EndBlock() + env.blockProcWg.Wait() + + number := env.store.GetBlockIndex(event.ID()) + block := env.GetEvmStateReader().GetBlock(common.Hash{}, uint64(*number)) + receipts := env.store.evm.GetReceipts(*number, env.EthAPI.signer, block.Hash, block.Transactions) + return receipts, nil } func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*types.Transaction) (types.Receipts, error) { @@ -255,7 +280,7 @@ func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*type for _, tx := range e.Txs() { h := tx.Hash() delete(waitForInEvents, h) - // env.txpool.(*dummyTxPool).Delete(tx.Hash()) + env.txpool.(*dummyTxPool).Delete(tx.Hash()) } } }() From 9b6f168a556c50f678480dd9d46496f1add7f73d Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 9 Feb 2024 19:26:53 +1000 Subject: [PATCH 16/20] TestTxIndexing() shows the effect of the fix --- gossip/txposition_test.go | 67 ++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/gossip/txposition_test.go b/gossip/txposition_test.go index e277d7071..8651d2ae3 100644 --- a/gossip/txposition_test.go +++ b/gossip/txposition_test.go @@ -1,10 +1,10 @@ package gossip import ( - "fmt" "math/big" "testing" + "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/require" @@ -17,16 +17,21 @@ func TestTxIndexing(t *testing.T) { logger.SetLevel("debug") require := require.New(t) - env := newTestEnv(2, 3) + var ( + epoch = idx.Epoch(256) + validators = idx.Validator(3) + proposals = [][32]byte{ + ballotOption("Option 1"), + ballotOption("Option 2"), + ballotOption("Option 3"), + } + ) + + env := newTestEnv(epoch, validators) defer env.Close() - proposals := [][32]byte{ - ballotOption("Option 1"), - ballotOption("Option 2"), - ballotOption("Option 3"), - } + // Preparing: - // preparing _, tx1pre, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals) require.NoError(err) require.NotNil(cBallot) @@ -34,7 +39,7 @@ func TestTxIndexing(t *testing.T) { tx2pre, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3)) require.NoError(err) require.NotNil(tx2pre) - receipts, err := env.BlockTxs(nextEpoch, + receipts, err := env.ApplyTxs(sameEpoch, tx1pre, tx2pre, ) @@ -43,6 +48,9 @@ func TestTxIndexing(t *testing.T) { for i, r := range receipts { require.Equal(types.ReceiptStatusSuccessful, r.Status, i) } + + // Testing: + // invalid tx tx1reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0)) require.NoError(err) @@ -52,23 +60,46 @@ func TestTxIndexing(t *testing.T) { require.NoError(err) require.NotNil(tx2ok) // skipped tx - _, tx3skipped, _, err := ballot.DeployBallot(withLowGas(env.Pay(1)), env, proposals) + tx3skipped := tx2ok + // valid tx + tx4ok, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(2)) require.NoError(err) - require.NotNil(tx3skipped) + require.NotNil(tx1reverted) - receipts, err = env.BlockTxs(nextEpoch, + receipts, err = env.BlockTxs(sameEpoch, tx1reverted, tx2ok, tx3skipped, - ) + tx4ok) require.NoError(err) require.Len(receipts, 3) - var block *big.Int + + var blockN *big.Int for i, r := range receipts { - if block == nil { - block = r.BlockNumber + if blockN == nil { + blockN = r.BlockNumber } - require.Equal(block.Uint64(), r.BlockNumber.Uint64(), i) - } + require.Equal(blockN.Uint64(), r.BlockNumber.Uint64(), i) + txPos := env.store.evm.GetTxPosition(r.TxHash) + require.NotNil(txPos) + + switch r.TxHash { + case tx1reverted.Hash(): + require.Equal(types.ReceiptStatusFailed, r.Status, i) + require.Equal(txPos.BlockOffset, uint32(0)) + case tx2ok.Hash(): + require.Equal(types.ReceiptStatusSuccessful, r.Status, i) + require.Equal(txPos.BlockOffset, uint32(1)) + case tx3skipped.Hash(): + t.Fatal("skipped tx's receipt found") + case tx4ok.Hash(): + require.Equal(types.ReceiptStatusSuccessful, r.Status, i) + require.Equal(txPos.BlockOffset, uint32(3)) // THAT shows the effect of the fix #524 + } + + for j, l := range r.Logs { + require.Equal(txPos.BlockOffset, l.TxIndex, j) + } + } } From 2e5dac190cd3af744b72173bf1942b7619c85680 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 9 Feb 2024 22:56:24 +1000 Subject: [PATCH 17/20] revert the fix --- gossip/c_block_callbacks.go | 19 +++++++++++-------- gossip/txposition_test.go | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index cf026f043..d4fe3fea2 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -319,6 +319,7 @@ func consensusCallbackBeginBlockFn( for _, e := range blockEvents { txs = append(txs, e.Txs()...) } + _ = evmProcessor.Execute(txs) evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() @@ -343,16 +344,18 @@ func consensusCallbackBeginBlockFn( } } } + // memorize block position of each tx + for i, tx := range evmBlock.Transactions { + // not skipped txs only + position := txPositions[tx.Hash()] + position.Block = blockCtx.Idx + position.BlockOffset = uint32(i) + txPositions[tx.Hash()] = position + } + // call OnNewReceipt for i, r := range allReceipts { - // memorize block position for not skipped txs only - position := txPositions[r.TxHash] - position.Block = blockCtx.Idx - position.BlockOffset = uint32(r.TransactionIndex) - txPositions[r.TxHash] = position - // call OnNewReceipt - creator := position.EventCreator - // TODO: is it check necessary? + creator := txPositions[r.TxHash].EventCreator if creator != 0 && es.Validators.Get(creator) == 0 { creator = 0 } diff --git a/gossip/txposition_test.go b/gossip/txposition_test.go index 8651d2ae3..683e36d76 100644 --- a/gossip/txposition_test.go +++ b/gossip/txposition_test.go @@ -95,7 +95,7 @@ func TestTxIndexing(t *testing.T) { t.Fatal("skipped tx's receipt found") case tx4ok.Hash(): require.Equal(types.ReceiptStatusSuccessful, r.Status, i) - require.Equal(txPos.BlockOffset, uint32(3)) // THAT shows the effect of the fix #524 + require.Equal(txPos.BlockOffset, uint32(2)) // skipped txs aren't counted } for j, l := range r.Logs { From 6548d12972512ed3978f7d5a41d6c06596081049 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 9 Feb 2024 23:02:04 +1000 Subject: [PATCH 18/20] no data migration needed --- gossip/store_migration.go | 127 +------------------------------------- 1 file changed, 1 insertion(+), 126 deletions(-) diff --git a/gossip/store_migration.go b/gossip/store_migration.go index 3d94ea824..5c46b0e88 100644 --- a/gossip/store_migration.go +++ b/gossip/store_migration.go @@ -3,18 +3,11 @@ package gossip import ( "errors" "fmt" - "sync" - "sync/atomic" - "time" "github.com/Fantom-foundation/lachesis-base/hash" - "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/Fantom-foundation/lachesis-base/kvdb" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/Fantom-foundation/go-opera/gossip/evmstore" "github.com/Fantom-foundation/go-opera/inter" "github.com/Fantom-foundation/go-opera/inter/iblockproc" "github.com/Fantom-foundation/go-opera/utils/migration" @@ -35,9 +28,6 @@ func (s *Store) migrateData() error { } err := s.migrations().Exec(versions, s.flushDBs) - if err != nil { - panic(err) - } return err } @@ -53,8 +43,7 @@ func (s *Store) migrations() *migration.Migration { Next("erase gossip-async db", s.eraseGossipAsyncDB). Next("erase SFC API table", s.eraseSfcApiTable). Next("erase legacy genesis DB", s.eraseGenesisDB). - Next("calculate upgrade heights", s.calculateUpgradeHeights). - Next("EVM TxPosition.BlockOffset fix", s.fixTxPositionBlockOffset) + Next("calculate upgrade heights", s.calculateUpgradeHeights) } func unsupportedMigration() error { @@ -152,117 +141,3 @@ func (s *Store) calculateUpgradeHeights() error { } return nil } - -func (s *Store) fixTxPositionBlockOffset() (err error) { - const ( - parallels = 10 - ) - receiptsTable, _ := s.dbs.OpenDB("evm/r") - txPositionsTable, _ := s.dbs.OpenDB("evm/x") - - // for each block's receipts - var ( - wg sync.WaitGroup - items = new(uint32) - ) - processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) { - defer wg.Done() - pos := new(evmstore.TxPosition) - for rr := range input { - for _, r := range rr { - key := r.TxHash.Bytes() - got := s.rlp.Get(txPositionsTable, key, pos) - if got == nil { - continue - } - pos.BlockOffset = uint32(r.TransactionIndex) - s.rlp.Set(txPositionsTable, key, pos) - - atomic.AddUint32(items, 1) - } - } - } - - // for each block - var ( - blocks = new(uint32) - ) - processBlocksRange := func(from, to idx.Block) { - defer wg.Done() - wg.Add(parallels) - threads := make([]chan []*types.ReceiptForStorage, parallels) - for i := range threads { - threads[i] = make(chan []*types.ReceiptForStorage, 10) - go processBlockReceipts(threads[i]) - } - - it := receiptsTable.NewIterator(nil, from.Bytes()) - defer it.Release() - for n := 0; it.Next(); n++ { - if idx.BytesToBlock(it.Key()) > to { - break - } - atomic.AddUint32(blocks, 1) - - var receiptsStorage []*types.ReceiptForStorage - err := rlp.DecodeBytes(it.Value(), &receiptsStorage) - if err != nil { - s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value())) - } - threads[n%parallels] <- receiptsStorage - } - for i := range threads { - close(threads[i]) - } - } - - // status log - var ( - done = make(chan struct{}) - ) - defer close(done) - go func() { - var ( - start = time.Now() - prevFlushTime = time.Now() - ) - for again := true; again; { - select { - case <-time.After(s.cfg.MaxNonFlushedPeriod / 5): - again = true - case <-done: - again = false - } - s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "blocks", atomic.LoadUint32(blocks), "items", atomic.LoadUint32(items)) - if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod { - prevFlushTime = time.Now() - err = s.flushDBs() - if err != nil { - break - } - } - } - }() - - // params - firstBlock := func() idx.Block { - it := receiptsTable.NewIterator(nil, nil) - defer it.Release() - if it.Next() { - return idx.BytesToBlock(it.Key()) - } - return 0 - }() - lastBlock := s.GetBlockState().LastBlock.Idx - - // main start - s.Log.Debug("processBlocksRange", "from", firstBlock, "to", lastBlock) - step := (lastBlock - firstBlock) / parallels - wg.Add(parallels + 1) - for i := idx.Block(0); i <= parallels; i++ { - go processBlocksRange(firstBlock+i*step, firstBlock+(i+1)*step-1) - } - wg.Wait() - - return -} From d9d5d4fa57ab9c81e9227ab0a74df7c644dcf156 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 16 Feb 2024 21:10:57 +1000 Subject: [PATCH 19/20] testEnv.applyTxs() simplification --- gossip/common_test.go | 53 +++++++++++++---------------------------- gossip/dummy_tx_pool.go | 3 +++ 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/gossip/common_test.go b/gossip/common_test.go index 0586c64cc..bbed36242 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -8,6 +8,7 @@ import ( "math" "math/big" "sync" + "sync/atomic" "time" "github.com/Fantom-foundation/lachesis-base/abft" @@ -224,10 +225,6 @@ func (env *testEnv) GetEvmStateReader() *EvmStateReader { } } -func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { - return env.applyTxs(spent, false, txs...) -} - func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { // Just `env.applyTxs(spent, true, txs...)` does not work guaranteed because of gas rules. // So we make single-event block and skip emitting process. @@ -256,35 +253,23 @@ func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (ty return receipts, nil } -func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*types.Transaction) (types.Receipts, error) { +func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { + return env.applyTxs(spent, txs...) +} + +func (env *testEnv) applyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) { env.t = env.t.Add(spent) - waitForInEvents := make(map[common.Hash]struct{}, len(txs)) - waitForInBlocks := make(map[common.Hash]struct{}, len(txs)) + waitForCount := int64(len(txs)) + waitForTxs := make(map[common.Hash]struct{}, len(txs)) for _, tx := range txs { - waitForInEvents[tx.Hash()] = struct{}{} - waitForInBlocks[tx.Hash()] = struct{}{} + waitForTxs[tx.Hash()] = struct{}{} } wg := new(sync.WaitGroup) - wg.Add(2) + wg.Add(1) defer wg.Wait() - newEvents := make(chan *inter.EventPayload) - defer close(newEvents) - eventsSub := env.feed.SubscribeNewEmitted(newEvents) - defer eventsSub.Unsubscribe() - go func() { - defer wg.Done() - for e := range newEvents { - for _, tx := range e.Txs() { - h := tx.Hash() - delete(waitForInEvents, h) - env.txpool.(*dummyTxPool).Delete(tx.Hash()) - } - } - }() - receipts := make(types.Receipts, 0, len(txs)) newBlocks := make(chan evmcore.ChainHeadNotify) @@ -301,9 +286,10 @@ func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*type rr := env.store.evm.GetReceipts(n, env.EthAPI.signer, b.Block.Hash, b.Block.Transactions) for _, r := range rr { env.txpool.(*dummyTxPool).Delete(r.TxHash) - if _, ok := waitForInBlocks[r.TxHash]; ok { + if _, ok := waitForTxs[r.TxHash]; ok { receipts = append(receipts, r) - delete(waitForInBlocks, r.TxHash) + delete(waitForTxs, r.TxHash) + atomic.AddInt64(&waitForCount, -1) } } } @@ -337,8 +323,9 @@ func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*type } } env.txpool.(*dummyTxPool).Delete(tx) - if _, ok := waitForInBlocks[tx]; ok { - delete(waitForInBlocks, tx) + if _, ok := waitForTxs[tx]; ok { + delete(waitForTxs, tx) + atomic.AddInt64(&waitForCount, -1) } } } @@ -346,13 +333,7 @@ func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*type }() byEmitters := func() []*emitter.Emitter { - // datarace does not matter - if singleBlock && len(waitForInEvents) > 0 { - count := len(env.emitters) * 70 / 100 - // prevent block creation - return env.emitters[:count] - } - if len(waitForInBlocks) > 0 { + if atomic.LoadInt64(&waitForCount) > 0 { // allow block creation return env.emitters } diff --git a/gossip/dummy_tx_pool.go b/gossip/dummy_tx_pool.go index 763d6cca3..035bb5465 100644 --- a/gossip/dummy_tx_pool.go +++ b/gossip/dummy_tx_pool.go @@ -50,6 +50,9 @@ func (p *dummyTxPool) AddRemotes(txs []*types.Transaction) []error { } func (p *dummyTxPool) Count() int { + p.lock.Lock() + defer p.lock.Unlock() + return len(p.index) } From 59d028c9283420dc67ecfa2685937ce148f11f09 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 16 Feb 2024 21:32:18 +1000 Subject: [PATCH 20/20] fix of TestConsensusCallback() --- gossip/c_block_callbacks_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/gossip/c_block_callbacks_test.go b/gossip/c_block_callbacks_test.go index 687f2ff11..769b85520 100644 --- a/gossip/c_block_callbacks_test.go +++ b/gossip/c_block_callbacks_test.go @@ -17,9 +17,10 @@ func TestConsensusCallback(t *testing.T) { logger.SetTestMode(t) require := require.New(t) - const rounds = 30 - - const validatorsNum = 3 + const ( + rounds = 30 + validatorsNum = 3 + ) env := newTestEnv(2, validatorsNum) defer env.Close() @@ -32,11 +33,10 @@ func TestConsensusCallback(t *testing.T) { for n := uint64(0); n < rounds; n++ { // transfers - txs := make([]*types.Transaction, validatorsNum) - for i := idx.Validator(0); i < validatorsNum; i++ { - from := i % validatorsNum - to := 0 - txs[i] = env.Transfer(idx.ValidatorID(from+1), idx.ValidatorID(to+1), utils.ToFtm(100)) + txs := make([]*types.Transaction, 0, validatorsNum-1) + for to, from := 0, 1; from < validatorsNum; from++ { + transfer := env.Transfer(idx.ValidatorID(from+1), idx.ValidatorID(to+1), utils.ToFtm(100)) + txs = append(txs, transfer) } tm := sameEpoch if n%10 == 0 { @@ -47,7 +47,7 @@ func TestConsensusCallback(t *testing.T) { // subtract fees for i, r := range rr { fee := big.NewInt(0).Mul(new(big.Int).SetUint64(r.GasUsed), txs[i].GasPrice()) - balances[i] = big.NewInt(0).Sub(balances[i], fee) + balances[i+1] = big.NewInt(0).Sub(balances[i+1], fee) } // balance movements balances[0].Add(balances[0], utils.ToFtm(200))