diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index 3abe9c42948..4d4bbe21046 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -1,8 +1,8 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 -// #nosec import ( + // #nosec "crypto/md5" "io" "os" diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 4c7f5f511e3..a22a5472f09 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -52,7 +52,6 @@ import ( txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/ingestion" "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" - "github.com/onflow/flow-go/engine/execution/ingestion/loader" "github.com/onflow/flow-go/engine/execution/ingestion/stop" "github.com/onflow/flow-go/engine/execution/ingestion/uploader" exeprovider "github.com/onflow/flow-go/engine/execution/provider" @@ -1081,61 +1080,24 @@ func (exeNode *ExecutionNode) LoadIngestionEngine( exeNode.collectionRequester = reqEng } - if exeNode.exeConf.enableNewIngestionEngine { - _, core, err := ingestion.NewMachine( - node.Logger, - node.ProtocolEvents, - exeNode.collectionRequester, - colFetcher, - node.Storage.Headers, - node.Storage.Blocks, - node.Storage.Collections, - exeNode.executionState, - node.State, - exeNode.collector, - exeNode.computationManager, - exeNode.providerEngine, - exeNode.blockDataUploader, - exeNode.stopControl, - ) - - return core, err - } - - var blockLoader ingestion.BlockLoader - if exeNode.exeConf.enableStorehouse { - blockLoader = loader.NewUnfinalizedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState) - } else { - blockLoader = loader.NewUnexecutedLoader(node.Logger, node.State, node.Storage.Headers, exeNode.executionState) - } - - ingestionEng, err := ingestion.New( - exeNode.ingestionUnit, + _, core, err := ingestion.NewMachine( node.Logger, - node.EngineRegistry, + node.ProtocolEvents, + exeNode.collectionRequester, colFetcher, node.Storage.Headers, node.Storage.Blocks, node.Storage.Collections, - exeNode.computationManager, - exeNode.providerEngine, exeNode.executionState, + node.State, exeNode.collector, - node.Tracer, - exeNode.exeConf.extensiveLog, - exeNode.executionDataPruner, + exeNode.computationManager, + exeNode.providerEngine, exeNode.blockDataUploader, exeNode.stopControl, - blockLoader, ) - // TODO: we should solve these mutual dependencies better - // => https://github.com/dapperlabs/flow-go/issues/4360 - exeNode.collectionRequester.WithHandle(ingestionEng.OnCollection) - - node.ProtocolEvents.AddConsumer(ingestionEng) - - return ingestionEng, err + return core, err } // create scripts engine for handling script execution diff --git a/cmd/execution_config.go b/cmd/execution_config.go index c8ba7092c32..97f808ae6e7 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -69,11 +69,10 @@ type ExecutionConfig struct { // It works around an issue where some collection nodes are not configured with enough // this works around an issue where some collection nodes are not configured with enough // file descriptors causing connection failures. - onflowOnlyLNs bool - enableStorehouse bool - enableChecker bool - enableNewIngestionEngine bool - publicAccessID string + onflowOnlyLNs bool + enableStorehouse bool + enableChecker bool + publicAccessID string } func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { @@ -132,7 +131,9 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.BoolVar(&exeConf.onflowOnlyLNs, "temp-onflow-only-lns", false, "do not use unless required. forces node to only request collections from onflow collection nodes") flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false") flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true") - flags.BoolVar(&exeConf.enableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") + // deprecated. Retain it to prevent nodes that previously had this configuration from crashing. + var deprecatedEnableNewIngestionEngine bool + flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") flags.StringVar(&exeConf.publicAccessID, "public-access-id", "", "public access ID for the node") } diff --git a/engine/execution/ingestion/core.go b/engine/execution/ingestion/core.go index 95d68b30f05..62889ffc479 100644 --- a/engine/execution/ingestion/core.go +++ b/engine/execution/ingestion/core.go @@ -398,6 +398,14 @@ func (e *Core) onBlockExecuted( return nil } +func nonSystemTransactionCount(result flow.ExecutionResult) uint64 { + count := uint64(0) + for _, chunk := range result.Chunks { + count += chunk.NumberOfTransactions + } + return count +} + func (e *Core) onCollection(col *flow.Collection) error { colID := col.ID() e.log.Info(). diff --git a/engine/execution/ingestion/engine.go b/engine/execution/ingestion/engine.go deleted file mode 100644 index f3dfcaf1dd2..00000000000 --- a/engine/execution/ingestion/engine.go +++ /dev/null @@ -1,958 +0,0 @@ -package ingestion - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "github.com/onflow/flow-go/engine" - "github.com/onflow/flow-go/engine/execution" - "github.com/onflow/flow-go/engine/execution/computation" - "github.com/onflow/flow-go/engine/execution/ingestion/stop" - "github.com/onflow/flow-go/engine/execution/ingestion/uploader" - "github.com/onflow/flow-go/engine/execution/provider" - "github.com/onflow/flow-go/engine/execution/state" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module" - "github.com/onflow/flow-go/module/executiondatasync/execution_data" - "github.com/onflow/flow-go/module/executiondatasync/pruner" - "github.com/onflow/flow-go/module/mempool/entity" - "github.com/onflow/flow-go/module/mempool/queue" - "github.com/onflow/flow-go/module/mempool/stdmap" - "github.com/onflow/flow-go/module/trace" - "github.com/onflow/flow-go/network" - psEvents "github.com/onflow/flow-go/state/protocol/events" - "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/utils/logging" -) - -var _ execution_data.ProcessedHeightRecorder = (*Engine)(nil) - -// An Engine receives and saves incoming blocks. -type Engine struct { - psEvents.Noop // satisfy protocol events consumer interface - execution_data.ProcessedHeightRecorder - - unit *engine.Unit - log zerolog.Logger - collectionFetcher CollectionFetcher - headers storage.Headers // see comments on getHeaderByHeight for why we need it - blocks storage.Blocks - collections storage.Collections - computationManager computation.ComputationManager - providerEngine provider.ProviderEngine - mempool *Mempool - execState state.ExecutionState - metrics module.ExecutionMetrics - tracer module.Tracer - extensiveLogging bool - executionDataPruner *pruner.Pruner - uploader *uploader.Manager - stopControl *stop.StopControl - loader BlockLoader -} - -func New( - unit *engine.Unit, - logger zerolog.Logger, - net network.EngineRegistry, - collectionFetcher CollectionFetcher, - headers storage.Headers, - blocks storage.Blocks, - collections storage.Collections, - executionEngine computation.ComputationManager, - providerEngine provider.ProviderEngine, - execState state.ExecutionState, - metrics module.ExecutionMetrics, - tracer module.Tracer, - extLog bool, - pruner *pruner.Pruner, - uploader *uploader.Manager, - stopControl *stop.StopControl, - loader BlockLoader, -) (*Engine, error) { - log := logger.With().Str("engine", "ingestion").Logger() - - mempool := newMempool() - - eng := Engine{ - unit: unit, - log: log, - collectionFetcher: collectionFetcher, - headers: headers, - blocks: blocks, - collections: collections, - computationManager: executionEngine, - providerEngine: providerEngine, - mempool: mempool, - execState: execState, - metrics: metrics, - tracer: tracer, - extensiveLogging: extLog, - executionDataPruner: pruner, - uploader: uploader, - stopControl: stopControl, - loader: loader, - ProcessedHeightRecorder: execution_data.NewProcessedHeightRecorderManager(0), - } - - return &eng, nil -} - -// Ready returns a channel that will close when the engine has -// successfully started. -func (e *Engine) Ready() <-chan struct{} { - if e.stopControl.IsExecutionStopped() { - return e.unit.Ready() - } - - if err := e.uploader.RetryUploads(); err != nil { - e.log.Warn().Msg("failed to re-upload all ComputationResults") - } - - err := e.reloadUnexecutedBlocks() - if err != nil { - e.log.Fatal().Err(err).Msg("failed to load all unexecuted blocks") - } - - return e.unit.Ready() -} - -// Done returns a channel that will close when the engine has -// successfully stopped. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return fmt.Errorf("ingestion error does not process local events") -} - -// on nodes startup, we need to load all the unexecuted blocks to the execution queues. -// blocks have to be loaded in the way that the parent has been loaded before loading its children -func (e *Engine) reloadUnexecutedBlocks() error { - unexecuted, err := e.loader.LoadUnexecuted(e.unit.Ctx()) - if err != nil { - return fmt.Errorf("could not load unexecuted blocks: %w", err) - } - // it's possible the BlockProcessable is called during the reloading, as the follower engine - // will receive blocks before ingestion engine is ready. - // The problem with that is, since the reloading hasn't finished yet, enqueuing the new block from - // the BlockProcessable callback will fail, because its parent block might have not been reloaded - // to the queues yet. - // So one solution here is to lock the execution queues during reloading, so that if BlockProcessable - // is called before reloading is finished, it will be blocked, which will avoid that edge case. - return e.mempool.Run(func( - blockByCollection *stdmap.BlockByCollectionBackdata, - executionQueues *stdmap.QueuesBackdata, - ) error { - for _, blockID := range unexecuted { - err := e.reloadBlock(blockByCollection, executionQueues, blockID) - if err != nil { - return fmt.Errorf("could not reload block: %v, %w", blockID, err) - } - - e.log.Debug().Hex("block_id", blockID[:]).Msg("reloaded block") - } - - e.log.Info().Int("count", len(unexecuted)).Msg("all unexecuted have been successfully reloaded") - - return nil - }) -} - -func (e *Engine) reloadBlock( - blockByCollection *stdmap.BlockByCollectionBackdata, - executionQueues *stdmap.QueuesBackdata, - blockID flow.Identifier, -) error { - block, err := e.blocks.ByID(blockID) - if err != nil { - return fmt.Errorf("could not get block by ID: %v %w", blockID, err) - } - - // enqueue the block and check if there is any missing collections - missingCollections, err := e.enqueueBlockAndCheckExecutable(blockByCollection, executionQueues, block, false) - - if err != nil { - return fmt.Errorf("could not enqueue block %x on reloading: %w", blockID, err) - } - - // forward the missing collections to requester engine for requesting them from collection nodes, - // adding the missing collections to mempool in order to trigger the block execution as soon as - // all missing collections are received. - err = e.fetchAndHandleCollection(blockID, block.Header.Height, missingCollections, func(collection *flow.Collection) error { - err := e.addCollectionToMempool(collection, blockByCollection) - - if err != nil { - return fmt.Errorf("could not add collection to mempool: %w", err) - } - return nil - }) - - if err != nil { - return fmt.Errorf("could not fetch or handle collection %w", err) - } - return nil -} - -// BlockProcessable handles the new verified blocks (blocks that -// have passed consensus validation) received from the consensus nodes -// NOTE: BlockProcessable might be called multiple times for the same block. -// NOTE: Ready calls reloadUnexecutedBlocks during initialization, which handles dropped protocol events. -func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate) { - - // TODO: this should not be blocking: https://github.com/onflow/flow-go/issues/4400 - - // skip if stopControl tells to skip, so that we can avoid fetching collections - // for this block - if !e.stopControl.ShouldExecuteBlock(b.ID(), b.Height) { - return - } - - blockID := b.ID() - newBlock, err := e.blocks.ByID(blockID) - if err != nil { - e.log.Fatal().Err(err).Msgf("could not get incorporated block(%v): %v", blockID, err) - } - - e.log.Info().Hex("block_id", blockID[:]). - Uint64("height", b.Height). - Msg("handling new block") - - err = e.handleBlock(e.unit.Ctx(), newBlock) - if err != nil { - e.log.Error().Err(err).Hex("block_id", blockID[:]).Msg("failed to handle block") - } -} - -// Main handling - -// handle block will process the incoming block. -// the block has passed the consensus validation. -func (e *Engine) handleBlock(ctx context.Context, block *flow.Block) error { - - blockID := block.ID() - log := e.log.With().Hex("block_id", blockID[:]).Logger() - - span, _ := e.tracer.StartBlockSpan(ctx, blockID, trace.EXEHandleBlock) - defer span.End() - - executed, err := e.execState.IsBlockExecuted(block.Header.Height, blockID) - if err != nil { - return fmt.Errorf("could not check whether block is executed: %w", err) - } - - if executed { - log.Debug().Msg("block has been executed already") - return nil - } - - var missingCollections []*flow.CollectionGuarantee - // unexecuted block - // acquiring the lock so that there is only one process modifying the queue - err = e.mempool.Run(func( - blockByCollection *stdmap.BlockByCollectionBackdata, - executionQueues *stdmap.QueuesBackdata, - ) error { - missing, err := e.enqueueBlockAndCheckExecutable(blockByCollection, executionQueues, block, false) - if err != nil { - return err - } - missingCollections = missing - return nil - }) - - if err != nil { - return fmt.Errorf("could not enqueue block %v: %w", blockID, err) - } - - return e.addOrFetch(blockID, block.Header.Height, missingCollections) -} - -func (e *Engine) enqueueBlockAndCheckExecutable( - blockByCollection *stdmap.BlockByCollectionBackdata, - executionQueues *stdmap.QueuesBackdata, - block *flow.Block, - checkStateSync bool, -) ([]*flow.CollectionGuarantee, error) { - executableBlock := &entity.ExecutableBlock{ - Block: block, - CompleteCollections: make(map[flow.Identifier]*entity.CompleteCollection), - } - - blockID := executableBlock.ID() - - lg := e.log.With(). - Hex("block_id", blockID[:]). - Uint64("block_height", executableBlock.Block.Header.Height). - Logger() - - // adding the block to the queue, - queue, added, head := enqueue(executableBlock, executionQueues) - - // if it's not added, it means the block is not a new block, it already - // exists in the queue, then bail - if !added { - log.Debug().Hex("block_id", logging.Entity(executableBlock)). - Int("block_height", int(executableBlock.Height())). - Msg("block already exists in the execution queue") - return nil, nil - } - - firstUnexecutedHeight := queue.Head.Item.Height() - - // check if a block is executable. - // a block is executable if the following conditions are all true - // 1) the parent state commitment is ready - // 2) the collections for the block payload are ready - // 3) the child block is ready for querying the randomness - - // check if the block's parent has been executed. (we can't execute the block if the parent has - // not been executed yet) - // check if there is a statecommitment for the parent block - parentCommitment, err := e.execState.StateCommitmentByBlockID(block.Header.ParentID) - - // if we found the statecommitment for the parent block, then add it to the executable block. - if err == nil { - executableBlock.StartState = &parentCommitment - } else if errors.Is(err, storage.ErrNotFound) { - // the parent block is an unexecuted block. - // if the queue only has one block, and its parent doesn't - // exist in the queue, then we need to load the block from the storage. - _, ok := queue.Nodes[blockID] - if !ok { - lg.Error().Msgf("an unexecuted parent block is missing in the queue") - } - } else { - // if there is exception, then crash - lg.Fatal().Err(err).Msg("unexpected error while accessing storage, shutting down") - } - - // check if we have all the collections for the block, and request them if there is missing. - missingCollections, err := e.matchAndFindMissingCollections(executableBlock, blockByCollection) - if err != nil { - return nil, fmt.Errorf("cannot send collection requests: %w", err) - } - - complete := false - - // if newly enqueued block is inside any existing queue, we should skip now and wait - // for parent to finish execution - if head { - // execute the block if the block is ready to be executed - complete = e.executeBlockIfComplete(executableBlock) - } - - lg.Info(). - // if the execution is halt, but the queue keeps growing, we could check which block - // hasn't been executed. - Uint64("first_unexecuted_in_queue", firstUnexecutedHeight). - Bool("complete", complete). - Bool("head_of_queue", head). - Int("cols", len(executableBlock.Block.Payload.Guarantees)). - Int("missing_cols", len(missingCollections)). - Msg("block is enqueued") - - return missingCollections, nil -} - -// executeBlock will execute the block. -// When finish executing, it will check if the children becomes executable and execute them if yes. -func (e *Engine) executeBlock( - ctx context.Context, - executableBlock *entity.ExecutableBlock, -) { - - // don't execute the block if the stop control says no - if !e.stopControl.ShouldExecuteBlock(executableBlock.Block.Header.ID(), executableBlock.Block.Header.Height) { - return - } - - lg := e.log.With(). - Hex("block_id", logging.Entity(executableBlock)). - Uint64("height", executableBlock.Block.Header.Height). - Int("collections", len(executableBlock.CompleteCollections)). - Logger() - - lg.Info().Msg("executing block") - - startedAt := time.Now() - - span, ctx := e.tracer.StartSpanFromContext(ctx, trace.EXEExecuteBlock) - defer span.End() - - parentID := executableBlock.Block.Header.ParentID - parentErID, err := e.execState.GetExecutionResultID(ctx, parentID) - if err != nil { - lg.Err(err). - Str("parentID", parentID.String()). - Msg("could not get execution result ID for parent block") - return - } - - snapshot := e.execState.NewStorageSnapshot(*executableBlock.StartState, - executableBlock.Block.Header.ParentID, - executableBlock.Block.Header.Height-1, - ) - - computationResult, err := e.computationManager.ComputeBlock( - ctx, - parentErID, - executableBlock, - snapshot) - if err != nil { - lg.Err(err).Msg("error while computing block") - return - } - - wg := sync.WaitGroup{} - wg.Add(1) - defer wg.Wait() - - go func() { - defer wg.Done() - err := e.uploader.Upload(ctx, computationResult) - if err != nil { - lg.Err(err).Msg("error while uploading block") - // continue processing. uploads should not block execution - } - }() - - err = e.saveExecutionResults(ctx, computationResult) - if errors.Is(err, storage.ErrDataMismatch) { - lg.Fatal().Err(err).Msg("fatal: trying to store different results for the same block") - } - - if err != nil { - lg.Err(err).Msg("error while handing computation results") - return - } - - receipt := computationResult.ExecutionReceipt - broadcasted, err := e.providerEngine.BroadcastExecutionReceipt( - ctx, executableBlock.Block.Header.Height, receipt) - if err != nil { - lg.Err(err).Msg("critical: failed to broadcast the receipt") - } - - finalEndState := computationResult.CurrentEndState() - lg.Info(). - Hex("parent_block", executableBlock.Block.Header.ParentID[:]). - Int("collections", len(executableBlock.Block.Payload.Guarantees)). - Hex("start_state", executableBlock.StartState[:]). - Hex("final_state", finalEndState[:]). - Hex("receipt_id", logging.Entity(receipt)). - Hex("result_id", logging.Entity(receipt.ExecutionResult)). - Hex("execution_data_id", receipt.ExecutionResult.ExecutionDataID[:]). - Bool("state_changed", finalEndState != *executableBlock.StartState). - Uint64("num_txs", nonSystemTransactionCount(receipt.ExecutionResult)). - Bool("broadcasted", broadcasted). - Int64("timeSpentInMS", time.Since(startedAt).Milliseconds()). - Msg("block executed") - - e.stopControl.OnBlockExecuted(executableBlock.Block.Header) - - err = e.onBlockExecuted(executableBlock, finalEndState) - if err != nil { - lg.Err(err).Msg("failed in process block's children") - } - - if e.executionDataPruner != nil { - e.OnBlockProcessed(executableBlock.Height()) - } - - e.unit.Ctx() - -} - -func nonSystemTransactionCount(result flow.ExecutionResult) uint64 { - count := uint64(0) - for _, chunk := range result.Chunks { - count += chunk.NumberOfTransactions - } - return count -} - -// we've executed the block, now we need to check: -// 1. whether the state syncing can be turned off -// 2. whether its children can be executed -// the executionQueues stores blocks as a tree: -// -// 10 <- 11 <- 12 -// ^-- 13 -// 14 <- 15 <- 16 -// -// if block 10 is the one just executed, then we will remove it from the queue, and add -// its children back, meaning the tree will become: -// -// 11 <- 12 -// 13 -// 14 <- 15 <- 16 - -func (e *Engine) onBlockExecuted( - executed *entity.ExecutableBlock, - finalState flow.StateCommitment, -) error { - - e.metrics.ExecutionStorageStateCommitment(int64(len(finalState))) - e.metrics.ExecutionLastExecutedBlockHeight(executed.Block.Header.Height) - - missingCollections := make(map[*entity.ExecutableBlock][]*flow.CollectionGuarantee) - err := e.mempool.Run( - func( - blockByCollection *stdmap.BlockByCollectionBackdata, - executionQueues *stdmap.QueuesBackdata, - ) error { - // find the block that was just executed - executionQueue, exists := executionQueues.ByID(executed.ID()) - if !exists { - logQueueState(e.log, executionQueues, executed.ID()) - // when the block no longer exists in the queue, it means there was a race condition that - // two onBlockExecuted was called for the same block, and one process has already removed the - // block from the queue, so we will print an error here - return fmt.Errorf("block has been executed already, no longer exists in the queue") - } - - // dismount the executed block and all its children - _, newQueues := executionQueue.Dismount() - - // go through each children, add them back to the queue, and check - // if the children is executable - for _, queue := range newQueues { - queueID := queue.ID() - added := executionQueues.Add(queueID, queue) - if !added { - // blocks should be unique in execution queues, if we dismount all the children blocks, then - // add it back to the queues, then it should always be able to add. - // If not, then there is a bug that the queues have duplicated blocks - return fmt.Errorf("fatal error - child block already in execution queue") - } - - // the parent block has been executed, update the StartState of - // each child block. - child := queue.Head.Item.(*entity.ExecutableBlock) - child.StartState = &finalState - - missing, err := e.matchAndFindMissingCollections(child, blockByCollection) - if err != nil { - return fmt.Errorf("cannot send collection requests: %w", err) - } - if len(missing) > 0 { - missingCollections[child] = append(missingCollections[child], missing...) - } - - completed := e.executeBlockIfComplete(child) - if !completed { - e.log.Debug(). - Hex("executed_block", logging.Entity(executed)). - Hex("child_block", logging.Entity(child)). - Msg("child block is not ready to be executed yet") - } else { - e.log.Debug(). - Hex("executed_block", logging.Entity(executed)). - Hex("child_block", logging.Entity(child)). - Msg("child block is ready to be executed") - } - } - - // remove the executed block - executionQueues.Remove(executed.ID()) - - return nil - }) - - if err != nil { - e.log.Fatal().Err(err). - Hex("block", logging.Entity(executed)). - Uint64("height", executed.Block.Header.Height). - Msg("error while requeueing blocks after execution") - } - - for child, missing := range missingCollections { - err := e.addOrFetch(child.ID(), child.Block.Header.Height, missing) - if err != nil { - return fmt.Errorf("fail to add missing collections: %w", err) - } - } - - return nil -} - -// executeBlockIfComplete checks whether the block is ready to be executed. -// if yes, execute the block -// return a bool indicates whether the block was completed -func (e *Engine) executeBlockIfComplete(eb *entity.ExecutableBlock) bool { - - if eb.Executing { - return false - } - - // if don't have the delta, then check if everything is ready for executing - // the block - if eb.IsComplete() { - - if e.extensiveLogging { - e.logExecutableBlock(eb) - } - - // no external synchronisation is used because this method must be run in a thread-safe context - eb.Executing = true - - e.unit.Launch(func() { - e.executeBlock(e.unit.Ctx(), eb) - }) - return true - } - return false -} - -// OnCollection is a callback for handling the collections requested by the -// collection requester. -func (e *Engine) OnCollection(originID flow.Identifier, entity flow.Entity) { - // convert entity to strongly typed collection - collection, ok := entity.(*flow.Collection) - if !ok { - e.log.Error().Msgf("invalid entity type (%T)", entity) - return - } - - // no need to validate the origin ID, since the collection requester has - // checked the origin must be a collection node. - - err := e.handleCollection(originID, collection) - if err != nil { - e.log.Error().Err(err).Msg("could not handle collection") - } -} - -// a block can't be executed if its collection is missing. -// since a collection can belong to multiple blocks, we need to -// find all the blocks that are needing this collection, and then -// check if any of these block becomes executable and execute it if -// is. -func (e *Engine) handleCollection( - originID flow.Identifier, - collection *flow.Collection, -) error { - collID := collection.ID() - - span, _ := e.tracer.StartCollectionSpan(context.Background(), collID, trace.EXEHandleCollection) - defer span.End() - - lg := e.log.With().Hex("collection_id", collID[:]).Logger() - - lg.Info().Hex("sender", originID[:]).Int("len", collection.Len()).Msg("handle collection") - defer func(startTime time.Time) { - lg.Info().TimeDiff("duration", time.Now(), startTime).Msg("collection handled") - }(time.Now()) - - // TODO: bail if have seen this collection before. - err := e.collections.Store(collection) - if err != nil { - return fmt.Errorf("cannot store collection: %w", err) - } - - return e.mempool.BlockByCollection.Run( - func(backdata *stdmap.BlockByCollectionBackdata) error { - return e.addCollectionToMempool(collection, backdata) - }, - ) -} - -func (e *Engine) addCollectionToMempool( - collection *flow.Collection, - backdata *stdmap.BlockByCollectionBackdata, -) error { - collID := collection.ID() - blockByCollectionID, exists := backdata.ByID(collID) - - // if we don't find any block for this collection, then - // means we don't need this collection any more. - // or it was ejected from the mempool when it was full. - // either way, we will return - if !exists { - return nil - } - - for _, executableBlock := range blockByCollectionID.ExecutableBlocks { - blockID := executableBlock.ID() - - completeCollection, ok := executableBlock.CompleteCollections[collID] - if !ok { - return fmt.Errorf("cannot handle collection: internal inconsistency - collection pointing to block %v which does not contain said collection", - blockID) - } - - e.metrics.UpdateCollectionMaxHeight(executableBlock.Block.Header.Height) - - if completeCollection.IsCompleted() { - // already received transactions for this collection - continue - } - - // update the transactions of the collection - // Note: it's guaranteed the transactions are for this collection, because - // the collection id matches with the CollectionID from the collection guarantee - completeCollection.Transactions = collection.Transactions - - // check if the block becomes executable - _ = e.executeBlockIfComplete(executableBlock) - } - - // since we've received this collection, remove it from the index - // this also prevents from executing the same block twice, because the second - // time when the collection arrives, it will not be found in the blockByCollectionID - // index. - backdata.Remove(collID) - - return nil -} - -func newQueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) ( - *queue.Queue, - bool, -) { - q := queue.NewQueue(blockify) - qID := q.ID() - return q, queues.Add(qID, q) -} - -// enqueue adds a block to the queues, return the queue that includes the block and booleans -// * is block new one (it's not already enqueued, not a duplicate) -// * is head of the queue (new queue has been created) -// -// Queues are chained blocks. Since a block can't be executable until its parent has been -// executed, the chained structure allows us to only check the head of each queue to see if -// any block becomes executable. -// for instance we have one queue whose head is A: -// -// A <- B <- C -// ^- D <- E -// -// If we receive E <- F, then we will add it to the queue: -// -// A <- B <- C -// ^- D <- E <- F -// -// Even through there are 6 blocks, we only need to check if block A becomes executable. -// when the parent block isn't in the queue, we add it as a new queue. for instance, if -// we receive H <- G, then the queues will become: -// -// A <- B <- C -// ^- D <- E -// G -func enqueue(blockify queue.Blockify, queues *stdmap.QueuesBackdata) ( - *queue.Queue, - bool, - bool, -) { - for _, queue := range queues.All() { - if stored, isNew := queue.TryAdd(blockify); stored { - return queue, isNew, false - } - } - queue, isNew := newQueue(blockify, queues) - return queue, isNew, true -} - -// check if the block's collections have been received, -// if yes, add the collection to the executable block -// if no, fetch the collection. -// if a block has 3 collection, it would be 3 reqs to fetch them. -// mark the collection belongs to the block, -// mark the block contains this collection. -// It returns the missing collections to be fetched -// TODO: to rename -func (e *Engine) matchAndFindMissingCollections( - executableBlock *entity.ExecutableBlock, - collectionsBackdata *stdmap.BlockByCollectionBackdata, -) ([]*flow.CollectionGuarantee, error) { - missingCollections := make([]*flow.CollectionGuarantee, 0, len(executableBlock.Block.Payload.Guarantees)) - - for _, guarantee := range executableBlock.Block.Payload.Guarantees { - coll := &entity.CompleteCollection{ - Guarantee: guarantee, - } - executableBlock.CompleteCollections[guarantee.ID()] = coll - - // check if we have requested this collection before. - // blocksNeedingCollection stores all the blocks that contain this collection - - if blocksNeedingCollection, exists := collectionsBackdata.ByID(guarantee.ID()); exists { - // if we've requested this collection, it means other block might also contain this collection. - // in this case, add this block to the map so that when the collection is received, - // we could update the executable block - blocksNeedingCollection.ExecutableBlocks[executableBlock.ID()] = executableBlock - - // since the collection is still being requested, we don't have the transactions - // yet, so exit - continue - } - - // the storage doesn't have this collection, meaning this is our first time seeing this - // collection guarantee, create an entry to store in collectionsBackdata in order to - // update the executable blocks when the collection is received. - blocksNeedingCollection := &entity.BlocksByCollection{ - CollectionID: guarantee.ID(), - ExecutableBlocks: map[flow.Identifier]*entity.ExecutableBlock{executableBlock.ID(): executableBlock}, - } - - added := collectionsBackdata.Add(blocksNeedingCollection.ID(), blocksNeedingCollection) - if !added { - // sanity check, should not happen, unless mempool implementation has a bug - return nil, fmt.Errorf("collection already mapped to block") - } - - missingCollections = append(missingCollections, guarantee) - } - - return missingCollections, nil -} - -// save the execution result of a block -func (e *Engine) saveExecutionResults( - ctx context.Context, - result *execution.ComputationResult, -) error { - span, childCtx := e.tracer.StartSpanFromContext(ctx, trace.EXESaveExecutionResults) - defer span.End() - - e.log.Debug(). - Hex("block_id", logging.Entity(result.ExecutableBlock)). - Msg("received computation result") - - for _, event := range result.ExecutionResult.ServiceEvents { - e.log.Info(). - Uint64("block_height", result.ExecutableBlock.Height()). - Hex("block_id", logging.Entity(result.ExecutableBlock)). - Str("event_type", event.Type.String()). - Msg("service event emitted") - } - - err := e.execState.SaveExecutionResults(childCtx, result) - if err != nil { - return fmt.Errorf("cannot persist execution state: %w", err) - } - - finalEndState := result.CurrentEndState() - e.log.Debug(). - Hex("block_id", logging.Entity(result.ExecutableBlock)). - Hex("start_state", result.ExecutableBlock.StartState[:]). - Hex("final_state", finalEndState[:]). - Msg("saved computation results") - - return nil -} - -// logExecutableBlock logs all data about an executable block -// over time we should skip this -func (e *Engine) logExecutableBlock(eb *entity.ExecutableBlock) { - // log block - e.log.Debug(). - Hex("block_id", logging.Entity(eb)). - Hex("prev_block_id", logging.ID(eb.Block.Header.ParentID)). - Uint64("block_height", eb.Block.Header.Height). - Int("number_of_collections", len(eb.Collections())). - RawJSON("block_header", logging.AsJSON(eb.Block.Header)). - Msg("extensive log: block header") - - // logs transactions - for i, col := range eb.Collections() { - for j, tx := range col.Transactions { - e.log.Debug(). - Hex("block_id", logging.Entity(eb)). - Int("block_height", int(eb.Block.Header.Height)). - Hex("prev_block_id", logging.ID(eb.Block.Header.ParentID)). - Int("collection_index", i). - Int("tx_index", j). - Hex("collection_id", logging.ID(col.Guarantee.CollectionID)). - Hex("tx_hash", logging.Entity(tx)). - Hex("start_state_commitment", eb.StartState[:]). - RawJSON("transaction", logging.AsJSON(tx)). - Msg("extensive log: executed tx content") - } - } -} - -// addOrFetch checks if there are stored collections for the given guarantees, if there is, -// forward them to mempool to process the collection, otherwise fetch the collections. -// any error returned are exception -func (e *Engine) addOrFetch( - blockID flow.Identifier, - height uint64, - guarantees []*flow.CollectionGuarantee, -) error { - return e.fetchAndHandleCollection(blockID, height, guarantees, func(collection *flow.Collection) error { - err := e.mempool.BlockByCollection.Run( - func(backdata *stdmap.BlockByCollectionBackdata) error { - return e.addCollectionToMempool(collection, backdata) - }) - - if err != nil { - return fmt.Errorf("could not add collection to mempool: %w", err) - } - return nil - }) -} - -// addOrFetch checks if there are stored collections for the given guarantees, if there is, -// forward them to the handler to process the collection, otherwise fetch the collections. -// any error returned are exception -func (e *Engine) fetchAndHandleCollection( - blockID flow.Identifier, - height uint64, - guarantees []*flow.CollectionGuarantee, - handleCollection func(*flow.Collection) error, -) error { - fetched := false - for _, guarantee := range guarantees { - // if we've requested this collection, we will store it in the storage, - // so check the storage to see whether we've seen it. - collection, err := e.collections.ByID(guarantee.CollectionID) - - if err == nil { - // we found the collection from storage, forward this collection to handler - err = handleCollection(collection) - if err != nil { - return fmt.Errorf("could not handle collection: %w", err) - } - - continue - } - - // check if there was exception - if !errors.Is(err, storage.ErrNotFound) { - return fmt.Errorf("error while querying for collection: %w", err) - } - - err = e.collectionFetcher.FetchCollection(blockID, height, guarantee) - if err != nil { - return fmt.Errorf("could not fetch collection: %w", err) - } - fetched = true - } - - // make sure that the requests are dispatched immediately by the requester - if fetched { - e.collectionFetcher.Force() - e.metrics.ExecutionCollectionRequestSent() - } - - return nil -} - -func logQueueState(log zerolog.Logger, queues *stdmap.QueuesBackdata, blockID flow.Identifier) { - all := queues.All() - - log.With().Hex("queue_state__executed_block_id", blockID[:]).Int("count", len(all)).Logger() - for i, queue := range all { - log.Error().Msgf("%v-th queue state: %v", i, queue.String()) - } -} diff --git a/engine/execution/ingestion/engine_test.go b/engine/execution/ingestion/engine_test.go deleted file mode 100644 index b7d5a3665d6..00000000000 --- a/engine/execution/ingestion/engine_test.go +++ /dev/null @@ -1,827 +0,0 @@ -package ingestion - -import ( - "context" - "crypto/rand" - "fmt" - "sync" - "testing" - "time" - - "github.com/onflow/crypto" - "github.com/rs/zerolog" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - enginePkg "github.com/onflow/flow-go/engine" - "github.com/onflow/flow-go/engine/execution" - computation "github.com/onflow/flow-go/engine/execution/computation/mock" - "github.com/onflow/flow-go/engine/execution/ingestion/loader" - "github.com/onflow/flow-go/engine/execution/ingestion/mocks" - "github.com/onflow/flow-go/engine/execution/ingestion/stop" - "github.com/onflow/flow-go/engine/execution/ingestion/uploader" - uploadermock "github.com/onflow/flow-go/engine/execution/ingestion/uploader/mock" - provider "github.com/onflow/flow-go/engine/execution/provider/mock" - stateMock "github.com/onflow/flow-go/engine/execution/state/mock" - "github.com/onflow/flow-go/fvm/storage/snapshot" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/mempool/entity" - "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/module/trace" - "github.com/onflow/flow-go/network/mocknetwork" - protocol "github.com/onflow/flow-go/state/protocol/mock" - storageerr "github.com/onflow/flow-go/storage" - storage "github.com/onflow/flow-go/storage/mock" - "github.com/onflow/flow-go/utils/unittest" -) - -type testingContext struct { - t *testing.T - engine *Engine - headers *storage.Headers - blocks *storage.Blocks - collections *mocks.MockCollectionStore - state *protocol.State - computationManager *computation.ComputationManager - providerEngine *provider.ProviderEngine - executionState *stateMock.ExecutionState - stopControl *stop.StopControl - uploadMgr *uploader.Manager - fetcher *mocks.MockFetcher - - mu *sync.Mutex -} - -func runWithEngine(t *testing.T, f func(testingContext)) { - - net := new(mocknetwork.EngineRegistry) - - // generates signing identity including staking key for signing - seed := make([]byte, crypto.KeyGenSeedMinLen) - n, err := rand.Read(seed) - require.Equal(t, n, crypto.KeyGenSeedMinLen) - require.NoError(t, err) - sk, err := crypto.GeneratePrivateKey(crypto.BLSBLS12381, seed) - require.NoError(t, err) - myIdentity := unittest.IdentityFixture() - myIdentity.Role = flow.RoleExecution - myIdentity.StakingPubKey = sk.PublicKey() - - headers := storage.NewHeaders(t) - blocks := storage.NewBlocks(t) - collections := mocks.NewMockCollectionStore() - - computationManager := computation.NewComputationManager(t) - providerEngine := provider.NewProviderEngine(t) - protocolState := protocol.NewState(t) - executionState := stateMock.NewExecutionState(t) - - var engine *Engine - - defer func() { - unittest.AssertClosesBefore(t, engine.Done(), 5*time.Second, "expect to stop before timeout") - computationManager.AssertExpectations(t) - protocolState.AssertExpectations(t) - executionState.AssertExpectations(t) - providerEngine.AssertExpectations(t) - }() - - log := unittest.Logger() - metrics := metrics.NewNoopCollector() - - tracer, err := trace.NewTracer(log, "test", "test", trace.SensitivityCaptureAll) - require.NoError(t, err) - - unit := enginePkg.NewUnit() - stopControl := stop.NewStopControl( - unit, - time.Second, - zerolog.Nop(), - executionState, - headers, - nil, - nil, - &flow.Header{Height: 1}, - false, - false, - ) - - uploadMgr := uploader.NewManager(trace.NewNoopTracer()) - - fetcher := mocks.NewMockFetcher() - loader := loader.NewUnexecutedLoader(log, protocolState, headers, executionState) - - engine, err = New( - unit, - log, - net, - fetcher, - headers, - blocks, - collections, - computationManager, - providerEngine, - executionState, - metrics, - tracer, - false, - nil, - uploadMgr, - stopControl, - loader, - ) - require.NoError(t, err) - - f(testingContext{ - t: t, - engine: engine, - headers: headers, - blocks: blocks, - collections: collections, - state: protocolState, - computationManager: computationManager, - providerEngine: providerEngine, - executionState: executionState, - uploadMgr: uploadMgr, - stopControl: stopControl, - fetcher: fetcher, - - mu: &sync.Mutex{}, - }) - - <-engine.Done() -} - -// TestExecuteOneBlock verifies after collection is received, -// block is executed, uploaded, and broadcasted -func TestExecuteOneBlock(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - // create a mocked storage that has similar behavior as the real execution state. - // the mocked storage allows us to prepare results for the prepared blocks, so that - // the mocked methods know what to return, and it also allows us to verify that the - // mocked API is called with correct data. - store := mocks.NewMockBlockStore(t) - - col := unittest.CollectionFixture(1) - // Root <- A - blockA := makeBlockWithCollection(store.RootBlock, &col) - result := store.CreateBlockAndMockResult(t, blockA) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(result) - - // receive block - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - wg := sync.WaitGroup{} - wg.Add(1) // wait for block A to be executed - - ctx.mockComputeBlock(store) - ctx.mockSaveExecutionResults(store, &wg) - - // verify upload will be called - uploader := uploadermock.NewUploader(ctx.t) - uploader.On("Upload", result).Return(nil).Once() - ctx.uploadMgr.AddUploader(uploader) - - // verify broadcast will be called - ctx.providerEngine.On("BroadcastExecutionReceipt", - mock.Anything, - blockA.Block.Header.Height, - result.ExecutionReceipt).Return(true, nil).Once() - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify collection is fetched - require.True(t, ctx.fetcher.IsFetched(col.ID())) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - }) -} - -// verify block will be executed if collection is received first -func TestExecuteBlocks(t *testing.T) { - - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - col1 := unittest.CollectionFixture(1) - col2 := unittest.CollectionFixture(1) - // Root <- A[C1] <- B[C2] - // prepare two blocks, so that receiving C2 before C1 won't trigger any block to be executed, - // which creates the case where C2 collection is received first, and block B will become - // executable as soon as its parent block A is executed. - blockA := makeBlockWithCollection(store.RootBlock, &col1) - blockB := makeBlockWithCollection(blockA.Block.Header, &col2) - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - - // receive block - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(2) // wait for 2 blocks to be executed - ctx.mockSaveExecutionResults(store, &wg) - - require.NoError(t, ctx.engine.handleCollection(unittest.IdentifierFixture(), &col2)) - require.NoError(t, ctx.engine.handleCollection(unittest.IdentifierFixture(), &col1)) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify collection is fetched - require.True(t, ctx.fetcher.IsFetched(col1.ID())) - require.True(t, ctx.fetcher.IsFetched(col2.ID())) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - }) -} - -// verify block will be executed if collection is already in storage -func TestExecuteNextBlockIfCollectionIsReady(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - col1 := unittest.CollectionFixture(1) - col2 := unittest.CollectionFixture(1) - - // Root <- A[C1] <- B[C2] - blockA := makeBlockWithCollection(store.RootBlock, &col1) - blockB := makeBlockWithCollection(blockA.Block.Header, &col2) - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - - // C2 is available in storage - require.NoError(t, ctx.collections.Store(&col2)) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - - // receiving block A and B will not trigger any execution - // because A is missing collection C1, B is waiting for A to be executed - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(2) // waiting for A and B to be executed - ctx.mockSaveExecutionResults(store, &wg) - - // receiving collection C1 will execute both A and B - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col1) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify collection is fetched - require.True(t, ctx.fetcher.IsFetched(col1.ID())) - require.False(t, ctx.fetcher.IsFetched(col2.ID())) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - }) -} - -// verify block will only be executed once even if block or collection are received multiple times -func TestExecuteBlockOnlyOnce(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - col := unittest.CollectionFixture(1) - // Root <- A[C] - blockA := makeBlockWithCollection(store.RootBlock, &col) - resultA := store.CreateBlockAndMockResult(t, blockA) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - - // receive block - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - // receive block again before collection is received - err = ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(1) // wait for block A to be executed - ctx.mockSaveExecutionResults(store, &wg) - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col) - require.NoError(t, err) - - // receiving collection again before block is executed - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // receiving collection again after block is executed - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col) - require.NoError(t, err) - - // verify collection is fetched - require.True(t, ctx.fetcher.IsFetched(col.ID())) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - }) -} - -// given two blocks depend on the same root block and contain same collections, -// receiving all collections will trigger the execution of both blocks concurrently -func TestExecuteForkConcurrently(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - // create A and B that have the same collections and same parent - // Root <- A[C1, C2] - // <- B[C1, C2] - col1 := unittest.CollectionFixture(1) - col2 := unittest.CollectionFixture(1) - - blockA := makeBlockWithCollection(store.RootBlock, &col1, &col2) - blockB := makeBlockWithCollection(store.RootBlock, &col1, &col2) - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - - // receive blocks - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col1) - require.NoError(t, err) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(2) // wait for A and B to be executed - ctx.mockSaveExecutionResults(store, &wg) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col2) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - }) -} - -// verify block will be executed in order -func TestExecuteBlockInOrder(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - // create A and B that have the same collections and same parent - // Root <- A[C1, C2] - // <- B[C2] <- C[C3] - // verify receiving C3, C1, then C2 will trigger all blocks to be executed - col1 := unittest.CollectionFixture(1) - col2 := unittest.CollectionFixture(1) - col3 := unittest.CollectionFixture(1) - - blockA := makeBlockWithCollection(store.RootBlock, &col1, &col2) - blockB := makeBlockWithCollection(store.RootBlock, &col2) - blockC := makeBlockWithCollection(store.RootBlock, &col3) - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - resultC := store.CreateBlockAndMockResult(t, blockC) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - ctx.mockNewStorageSnapshot(resultC) - - // receive blocks - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockC.Block) - require.NoError(t, err) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col3) - require.NoError(t, err) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col1) - require.NoError(t, err) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(3) // waiting for A, B, C to be executed - ctx.mockSaveExecutionResults(store, &wg) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col2) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - store.AssertExecuted(t, "C", blockC.ID()) - }) -} - -func logBlocks(blocks map[string]*entity.ExecutableBlock) { - log := unittest.Logger() - for name, b := range blocks { - log.Debug().Msgf("creating blocks for testing, block %v's ID:%v", name, b.ID()) - } -} - -// verify that when blocks above the stop height are finalized, they won't -// be executed -func TestStopAtHeightWhenFinalizedBeforeExecuted(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - // this collection is used as trigger of execution - executionTrigger := unittest.CollectionFixture(1) - blockA := makeBlockWithCollection(store.RootBlock, &executionTrigger) - blockB := makeBlockWithCollection(blockA.Block.Header) - blockC := makeBlockWithCollection(blockB.Block.Header) - blockD := makeBlockWithCollection(blockC.Block.Header) - - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - store.CreateBlockAndMockResult(t, blockC) - store.CreateBlockAndMockResult(t, blockD) - - stopHeight := store.RootBlock.Height + 3 - require.Equal(t, stopHeight, blockC.Block.Header.Height) // stop at C (C will not be executed) - err := ctx.stopControl.SetStopParameters(stop.StopParameters{ - StopBeforeHeight: stopHeight, - }) - require.NoError(t, err) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - - // receive blocks - err = ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockC.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockD.Block) - require.NoError(t, err) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(2) // only 2 blocks (A, B) will be executed - ctx.mockSaveExecutionResults(store, &wg) - - // all blocks finalized - ctx.stopControl.BlockFinalizedForTesting(blockA.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockB.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockC.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockD.Block.Header) - - // receiving the colleciton to trigger all blocks to be executed - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &executionTrigger) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // since stop height is C, verify that only A and B are executed, C and D are not executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - - store.AssertNotExecuted(t, "C", blockC.ID()) - store.AssertNotExecuted(t, "D", blockD.ID()) - }) -} - -// verify that blocks above the stop height won't be executed, even if they are -// later they got finalized -func TestStopAtHeightWhenExecutedBeforeFinalized(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - blockA := makeBlockWithCollection(store.RootBlock) - blockB := makeBlockWithCollection(blockA.Block.Header) - blockC := makeBlockWithCollection(blockB.Block.Header) - blockD := makeBlockWithCollection(blockC.Block.Header) - - resultA := store.CreateBlockAndMockResult(t, blockA) - resultB := store.CreateBlockAndMockResult(t, blockB) - store.CreateBlockAndMockResult(t, blockC) - store.CreateBlockAndMockResult(t, blockD) - - stopHeight := store.RootBlock.Height + 3 - require.Equal(t, stopHeight, blockC.Block.Header.Height) // stop at C (C will not be executed) - err := ctx.stopControl.SetStopParameters(stop.StopParameters{ - StopBeforeHeight: stopHeight, - }) - require.NoError(t, err) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - ctx.mockNewStorageSnapshot(resultB) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(2) // waiting for only A, B to be executed - ctx.mockSaveExecutionResults(store, &wg) - - // receive blocks - err = ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockC.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockD.Block) - require.NoError(t, err) - - // all blocks finalized - ctx.stopControl.BlockFinalizedForTesting(blockA.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockB.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockC.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockD.Block.Header) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // since stop height is C, verify that only A and B are executed, C and D are not executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertExecuted(t, "B", blockB.ID()) - - store.AssertNotExecuted(t, "C", blockC.ID()) - store.AssertNotExecuted(t, "D", blockD.ID()) - }) -} - -// verify that when blocks execution and finalization happen concurrently -func TestStopAtHeightWhenExecutionFinalization(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - // Root <- A <- B (stop height, won't execute) <- C - // verify when executing A and finalizing B happens concurrently, - // still won't allow B and C to be executed - blockA := makeBlockWithCollection(store.RootBlock) - blockB := makeBlockWithCollection(blockA.Block.Header) - blockC := makeBlockWithCollection(blockB.Block.Header) - - resultA := store.CreateBlockAndMockResult(t, blockA) - store.CreateBlockAndMockResult(t, blockB) - store.CreateBlockAndMockResult(t, blockC) - - err := ctx.stopControl.SetStopParameters(stop.StopParameters{ - StopBeforeHeight: blockB.Block.Header.Height, - }) - require.NoError(t, err) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(resultA) - - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - // waiting for: - // 1. A, B, C to be handled - // 2. A, B, C to be finalized - // 3. only A to be executed - wg.Add(3) - ctx.mockSaveExecutionResults(store, &wg) - - // receive blocks - go func(wg *sync.WaitGroup) { - err = ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockB.Block) - require.NoError(t, err) - - err = ctx.engine.handleBlock(context.Background(), blockC.Block) - require.NoError(t, err) - wg.Done() - }(&wg) - - go func(wg *sync.WaitGroup) { - // all blocks finalized - ctx.stopControl.BlockFinalizedForTesting(blockA.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockB.Block.Header) - ctx.stopControl.BlockFinalizedForTesting(blockC.Block.Header) - wg.Done() - }(&wg) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // since stop height is C, verify that only A and B are executed, C and D are not executed - store.AssertExecuted(t, "A", blockA.ID()) - store.AssertNotExecuted(t, "B", blockB.ID()) - store.AssertNotExecuted(t, "C", blockC.ID()) - }) -} - -// TestExecutedBlockUploadedFailureDoesntBlock tests that block processing continues even the -// uploader fails with an error -func TestExecutedBlockUploadedFailureDoesntBlock(t *testing.T) { - runWithEngine(t, func(ctx testingContext) { - store := mocks.NewMockBlockStore(t) - - col := unittest.CollectionFixture(1) - // Root <- A - blockA := makeBlockWithCollection(store.RootBlock, &col) - result := store.CreateBlockAndMockResult(t, blockA) - - ctx.mockIsBlockExecuted(store) - ctx.mockStateCommitmentByBlockID(store) - ctx.mockGetExecutionResultID(store) - ctx.mockNewStorageSnapshot(result) - - // receive block - err := ctx.engine.handleBlock(context.Background(), blockA.Block) - require.NoError(t, err) - - ctx.mockComputeBlock(store) - wg := sync.WaitGroup{} - wg.Add(1) // wait for block A to be executed - ctx.mockSaveExecutionResults(store, &wg) - - // verify upload will fail - uploader1 := uploadermock.NewUploader(ctx.t) - uploader1.On("Upload", result).Return(fmt.Errorf("error uploading")).Once() - ctx.uploadMgr.AddUploader(uploader1) - - // verify broadcast will be called - ctx.providerEngine.On("BroadcastExecutionReceipt", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - - err = ctx.engine.handleCollection(unittest.IdentifierFixture(), &col) - require.NoError(t, err) - - unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second) - - // verify collection is fetched - require.True(t, ctx.fetcher.IsFetched(col.ID())) - - // verify block is executed - store.AssertExecuted(t, "A", blockA.ID()) - }) -} - -func makeCollection() (*flow.Collection, *flow.CollectionGuarantee) { - col := unittest.CollectionFixture(1) - gua := col.Guarantee() - return &col, &gua -} - -func makeBlockWithCollection(parent *flow.Header, cols ...*flow.Collection) *entity.ExecutableBlock { - block := unittest.BlockWithParentFixture(parent) - completeCollections := make(map[flow.Identifier]*entity.CompleteCollection, len(block.Payload.Guarantees)) - for _, col := range cols { - g := col.Guarantee() - block.Payload.Guarantees = append(block.Payload.Guarantees, &g) - - cc := &entity.CompleteCollection{ - Guarantee: &g, - Transactions: col.Transactions, - } - completeCollections[col.ID()] = cc - } - block.Header.PayloadHash = block.Payload.Hash() - - executableBlock := &entity.ExecutableBlock{ - Block: block, - CompleteCollections: completeCollections, - StartState: unittest.StateCommitmentPointerFixture(), - } - return executableBlock -} - -func (ctx *testingContext) mockIsBlockExecuted(store *mocks.MockBlockStore) { - ctx.executionState.On("IsBlockExecuted", mock.Anything, mock.Anything). - Return(func(height uint64, blockID flow.Identifier) (bool, error) { - _, err := store.GetExecuted(blockID) - if err != nil { - return false, nil - } - return true, nil - }) -} - -func (ctx *testingContext) mockStateCommitmentByBlockID(store *mocks.MockBlockStore) { - ctx.executionState.On("StateCommitmentByBlockID", mock.Anything). - Return(func(blockID flow.Identifier) (flow.StateCommitment, error) { - result, err := store.GetExecuted(blockID) - if err != nil { - return flow.StateCommitment{}, storageerr.ErrNotFound - } - return result.Result.CurrentEndState(), nil - }) -} - -func (ctx *testingContext) mockGetExecutionResultID(store *mocks.MockBlockStore) { - ctx.executionState.On("GetExecutionResultID", mock.Anything, mock.Anything). - Return(func(ctx context.Context, blockID flow.Identifier) (flow.Identifier, error) { - blockResult, err := store.GetExecuted(blockID) - if err != nil { - return flow.ZeroID, storageerr.ErrNotFound - } - - return blockResult.Result.ExecutionReceipt.ExecutionResult.ID(), nil - }) -} - -func (ctx *testingContext) mockNewStorageSnapshot(result *execution.ComputationResult) { - // the result is the mocked result for the block, in other words, if the ingestion executes this block, - // the mocked computationManager will produce this result. - // so when mocking the StorageSnapshot method, it must be called with the StartState, as well as its - // parent block, which is used for retrieving the storage state at the end of the parent block. - ctx.executionState.On("NewStorageSnapshot", - *result.ExecutableBlock.StartState, - result.ExecutableBlock.Block.Header.ParentID, - result.ExecutableBlock.Block.Header.Height-1).Return(nil) -} - -func (ctx *testingContext) mockComputeBlock(store *mocks.MockBlockStore) { - ctx.computationManager.On("ComputeBlock", mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(func(ctx context.Context, - parentBlockExecutionResultID flow.Identifier, - block *entity.ExecutableBlock, - snapshot snapshot.StorageSnapshot) ( - *execution.ComputationResult, error) { - blockResult, ok := store.ResultByBlock[block.ID()] - if !ok { - return nil, fmt.Errorf("block %s not found", block.ID()) - } - return blockResult.Result, nil - }) -} - -func (ctx *testingContext) mockSaveExecutionResults(store *mocks.MockBlockStore, wg *sync.WaitGroup) { - ctx.executionState.On("SaveExecutionResults", mock.Anything, mock.Anything). - Return(func(ctx context.Context, result *execution.ComputationResult) error { - defer wg.Done() - err := store.MarkExecuted(result) - if err != nil { - return err - } - return nil - }) -} diff --git a/engine/execution/ingestion/machine.go b/engine/execution/ingestion/machine.go index a4d0a27ee5d..efb9f521b83 100644 --- a/engine/execution/ingestion/machine.go +++ b/engine/execution/ingestion/machine.go @@ -54,7 +54,7 @@ func NewMachine( broadcaster provider.ProviderEngine, uploader *uploader.Manager, stopControl *stop.StopControl, -) (*Machine, module.ReadyDoneAware, error) { +) (*Machine, *Core, error) { e := &Machine{ log: logger.With().Str("engine", "ingestion_machine").Logger(), diff --git a/engine/execution/ingestion/mempool.go b/engine/execution/ingestion/mempool.go deleted file mode 100644 index 58d2b11f923..00000000000 --- a/engine/execution/ingestion/mempool.go +++ /dev/null @@ -1,29 +0,0 @@ -package ingestion - -//revive:disable:unexported-return - -import ( - "github.com/onflow/flow-go/module/mempool/stdmap" -) - -type Mempool struct { - ExecutionQueue *stdmap.Queues - BlockByCollection *stdmap.BlockByCollections -} - -func (m *Mempool) Run(f func(blockByCollection *stdmap.BlockByCollectionBackdata, executionQueue *stdmap.QueuesBackdata) error) error { - return m.ExecutionQueue.Run(func(queueBackdata *stdmap.QueuesBackdata) error { - return m.BlockByCollection.Run(func(blockByCollectionBackdata *stdmap.BlockByCollectionBackdata) error { - return f(blockByCollectionBackdata, queueBackdata) - }) - }) -} - -func newMempool() *Mempool { - m := &Mempool{ - BlockByCollection: stdmap.NewBlockByCollections(), - ExecutionQueue: stdmap.NewQueues(), - } - - return m -} diff --git a/engine/testutil/mock/nodes.go b/engine/testutil/mock/nodes.go index d12a409cf57..8a2ea4fed1a 100644 --- a/engine/testutil/mock/nodes.go +++ b/engine/testutil/mock/nodes.go @@ -22,7 +22,7 @@ import ( "github.com/onflow/flow-go/engine/consensus/matching" "github.com/onflow/flow-go/engine/consensus/sealing" "github.com/onflow/flow-go/engine/execution/computation" - "github.com/onflow/flow-go/engine/execution/ingestion" + executionIngest "github.com/onflow/flow-go/engine/execution/ingestion" executionprovider "github.com/onflow/flow-go/engine/execution/provider" "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/engine/verification/assigner" @@ -192,7 +192,7 @@ func (cn ConsensusNode) Done() { type ExecutionNode struct { GenericNode FollowerState protocol.FollowerState - IngestionEngine *ingestion.Engine + IngestionEngine *executionIngest.Core ExecutionEngine *computation.Manager RequestEngine *requester.Engine ReceiptsEngine *executionprovider.Engine @@ -217,6 +217,7 @@ func (en ExecutionNode) Ready(ctx context.Context) { // new interface. irctx, _ := irrecoverable.WithSignaler(ctx) en.ReceiptsEngine.Start(irctx) + en.IngestionEngine.Start(irctx) en.FollowerCore.Start(irctx) en.FollowerEngine.Start(irctx) en.SyncEngine.Start(irctx) @@ -238,7 +239,6 @@ func (en ExecutionNode) Done(cancelFunc context.CancelFunc) { // to stop all (deprecated) ready-done-aware <-util.AllDone( - en.IngestionEngine, en.IngestionEngine, en.ReceiptsEngine, en.Ledger, diff --git a/engine/testutil/nodes.go b/engine/testutil/nodes.go index b6d1037b500..8f77a928e32 100644 --- a/engine/testutil/nodes.go +++ b/engine/testutil/nodes.go @@ -46,7 +46,6 @@ import ( "github.com/onflow/flow-go/engine/execution/computation/query" "github.com/onflow/flow-go/engine/execution/ingestion" exeFetcher "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" - "github.com/onflow/flow-go/engine/execution/ingestion/loader" "github.com/onflow/flow-go/engine/execution/ingestion/stop" "github.com/onflow/flow-go/engine/execution/ingestion/uploader" executionprovider "github.com/onflow/flow-go/engine/execution/provider" @@ -728,31 +727,25 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide ) fetcher := exeFetcher.NewCollectionFetcher(node.Log, requestEngine, node.State, false) - loader := loader.NewUnexecutedLoader(node.Log, node.State, node.Headers, execState) rootHead, rootQC := getRoot(t, &node) - ingestionEngine, err := ingestion.New( - unit, + _, ingestionCore, err := ingestion.NewMachine( node.Log, - node.Net, + node.ProtocolEvents, + requestEngine, fetcher, node.Headers, node.Blocks, collectionsStorage, - computationEngine, - pusherEngine, execState, + node.State, node.Metrics, - node.Tracer, - false, - nil, + computationEngine, + pusherEngine, uploader, stopControl, - loader, ) require.NoError(t, err) - requestEngine.WithHandle(ingestionEngine.OnCollection) - - node.ProtocolEvents.AddConsumer(ingestionEngine) + node.ProtocolEvents.AddConsumer(stopControl) followerCore, finalizer := createFollowerCore(t, &node, followerState, followerDistributor, rootHead, rootQC) // mock out hotstuff validator @@ -815,7 +808,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide return testmock.ExecutionNode{ GenericNode: node, FollowerState: followerState, - IngestionEngine: ingestionEngine, + IngestionEngine: ingestionCore, FollowerCore: followerCore, FollowerEngine: followerEng, SyncEngine: syncEngine, diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 200926bf83a..a58441701b9 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -390,14 +390,11 @@ func prepareExecutionService(container testnet.ContainerConfig, i int, n int) Se panic(err) } - enableNewIngestionEngine := true - service.Command = append(service.Command, "--triedir=/trie", fmt.Sprintf("--rpc-addr=%s:%s", container.ContainerName, testnet.GRPCPort), fmt.Sprintf("--cadence-tracing=%t", cadenceTracing), fmt.Sprintf("--extensive-tracing=%t", extesiveTracing), - fmt.Sprintf("--enable-new-ingestion-engine=%v", enableNewIngestionEngine), "--execution-data-dir=/data/execution-data", "--chunk-data-pack-dir=/data/chunk-data-pack", ) diff --git a/module/metrics.go b/module/metrics.go index f43c8b9325e..5615a00704a 100644 --- a/module/metrics.go +++ b/module/metrics.go @@ -1019,9 +1019,6 @@ type ExecutionMetrics interface { // ExecutionCollectionRequestSent reports when a request for a collection is sent to a collection node ExecutionCollectionRequestSent() - // Unused - ExecutionCollectionRequestRetried() - // ExecutionSync reports when the state syncing is triggered or stopped. ExecutionSync(syncing bool) diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 37d113061b7..e269a70de64 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -849,7 +849,6 @@ func (ec *ExecutionCollector) ExecutionTransactionExecuted( if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } - } // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation @@ -982,10 +981,6 @@ func (ec *ExecutionCollector) ExecutionCollectionRequestSent() { ec.collectionRequestSent.Inc() } -func (ec *ExecutionCollector) ExecutionCollectionRequestRetried() { - ec.collectionRequestRetried.Inc() -} - func (ec *ExecutionCollector) ExecutionBlockDataUploadStarted() { ec.blockDataUploadsInProgress.Inc() } diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 17460bf460a..70ede3eb090 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -193,7 +193,6 @@ func (nc *NoopCollector) ReadValuesSize(byte uint64) func (nc *NoopCollector) ReadDuration(duration time.Duration) {} func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {} func (nc *NoopCollector) ExecutionCollectionRequestSent() {} -func (nc *NoopCollector) ExecutionCollectionRequestRetried() {} func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {} func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {} func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {} diff --git a/module/mock/execution_metrics.go b/module/mock/execution_metrics.go index 6adc14e02a2..619fca3d60e 100644 --- a/module/mock/execution_metrics.go +++ b/module/mock/execution_metrics.go @@ -71,11 +71,6 @@ func (_m *ExecutionMetrics) ExecutionCollectionExecuted(dur time.Duration, stats _m.Called(dur, stats) } -// ExecutionCollectionRequestRetried provides a mock function with given fields: -func (_m *ExecutionMetrics) ExecutionCollectionRequestRetried() { - _m.Called() -} - // ExecutionCollectionRequestSent provides a mock function with given fields: func (_m *ExecutionMetrics) ExecutionCollectionRequestSent() { _m.Called()