Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockchain: Make block index flushable. #1375

Merged
merged 1 commit into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions blockchain/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,6 @@ func (b *BlockChain) maybeAcceptBlock(block *dcrutil.Block, flags BehaviorFlags)
// node.
b.pruner.pruneChainIfNeeded()

// Create a new block node for the block and add it to the block index.
// The block could either be on a side chain or the main chain, but it
// starts off as a side chain regardless.
blockHeader := &block.MsgBlock().Header
newNode := newBlockNode(blockHeader, prevNode)
newNode.populateTicketInfo(stake.FindSpentTicketsInBlock(block.MsgBlock()))
newNode.status = statusDataStored
b.index.AddNode(newNode)

// Insert the block into the database if it's not already there. Even
// though it is possible the block will ultimately fail to connect, it
// has already passed all proof-of-work and validity tests which means
Expand All @@ -160,19 +151,24 @@ func (b *BlockChain) maybeAcceptBlock(block *dcrutil.Block, flags BehaviorFlags)
// expensive connection logic. It also has some other nice properties
// such as making blocks that never become part of the main chain or
// blocks that fail to connect available for further analysis.
//
// Also, store the associated block index entry.
err = b.db.Update(func(dbTx database.Tx) error {
if err := dbMaybeStoreBlock(dbTx, block); err != nil {
return err
}
return dbMaybeStoreBlock(dbTx, block)
})
if err != nil {
return 0, err
}

if err := dbPutBlockNode(dbTx, newNode); err != nil {
return err
}
// Create a new block node for the block and add it to the block index.
// The block could either be on a side chain or the main chain, but it
// starts off as a side chain regardless.
blockHeader := &block.MsgBlock().Header
newNode := newBlockNode(blockHeader, prevNode)
newNode.populateTicketInfo(stake.FindSpentTicketsInBlock(block.MsgBlock()))
newNode.status = statusDataStored
b.index.AddNode(newNode)

return nil
})
// Ensure the new block index entry is written to the database.
err = b.index.flush()
if err != nil {
return 0, err
}
Expand Down
49 changes: 47 additions & 2 deletions blockchain/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,18 @@ type blockIndex struct {
db database.DB
chainParams *chaincfg.Params

// These following fields are protected by the embedded mutex.
//
// index contains an entry for every known block tracked by the block
// index.
//
// modified contains an entry for all nodes that have been modified
// since the last time the index was flushed to disk.
//
// chainTips contains an entry with the tip of all known side chains.
sync.RWMutex
index map[chainhash.Hash]*blockNode
modified map[*blockNode]struct{}
chainTips map[int64][]*blockNode
}

Expand All @@ -325,6 +335,7 @@ func newBlockIndex(db database.DB, chainParams *chaincfg.Params) *blockIndex {
db: db,
chainParams: chainParams,
index: make(map[chainhash.Hash]*blockNode),
modified: make(map[*blockNode]struct{}),
chainTips: make(map[int64][]*blockNode),
}
}
Expand Down Expand Up @@ -357,13 +368,14 @@ func (bi *blockIndex) addNode(node *blockNode) {
}
}

// AddNode adds the provided node to the block index. Duplicate entries are not
// checked so it is up to caller to avoid adding them.
// AddNode adds the provided node to the block index and marks it as modified.
// Duplicate entries are not checked so it is up to caller to avoid adding them.
//
// This function is safe for concurrent access.
func (bi *blockIndex) AddNode(node *blockNode) {
bi.Lock()
bi.addNode(node)
bi.modified[node] = struct{}{}
bi.Unlock()
}

Expand Down Expand Up @@ -433,6 +445,7 @@ func (bi *blockIndex) NodeStatus(node *blockNode) blockStatus {
func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) {
bi.Lock()
node.status |= flags
bi.modified[node] = struct{}{}
bi.Unlock()
}

Expand All @@ -443,5 +456,37 @@ func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) {
func (bi *blockIndex) UnsetStatusFlags(node *blockNode, flags blockStatus) {
bi.Lock()
node.status &^= flags
bi.modified[node] = struct{}{}
bi.Unlock()
}

// flush writes all of the modified block nodes to the database and clears the
// set of modified nodes if it succeeds.
func (bi *blockIndex) flush() error {
// Nothing to flush if there are no modified nodes.
bi.Lock()
if len(bi.modified) == 0 {
bi.Unlock()
return nil
}

// Write all of the nodes in the set of modified nodes to the database.
err := bi.db.Update(func(dbTx database.Tx) error {
for node := range bi.modified {
err := dbPutBlockNode(dbTx, node)
if err != nil {
return err
}
}
return nil
})
if err != nil {
bi.Unlock()
return err
}

// Clear the set of modified nodes.
bi.modified = make(map[*blockNode]struct{})
bi.Unlock()
return nil
}
75 changes: 59 additions & 16 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,9 @@ func (b *BlockChain) isMajorityVersion(minVer int32, startNode *blockNode, numRe
// passed node is not on a side chain or if the reorganize would involve
// reorganizing to a known invalid chain.
//
// This function may modify the validation state of nodes in the block index
// without flushing.
//
// This function MUST be called with the chain state lock held (for reads).
func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List) {
// Nothing to detach or attach if there is no node.
Expand Down Expand Up @@ -738,6 +741,12 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block,
countSpentOutputs(block, parent))
}

// Write any modified block index entries to the database before
// updating the best state.
if err := b.index.flush(); err != nil {
return err
}

// Generate a new best state snapshot that will be used to update the
// database and later memory if all database updates are successful.
b.stateLock.RLock()
Expand Down Expand Up @@ -773,16 +782,6 @@ func (b *BlockChain) connectBlock(node *blockNode, block, parent *dcrutil.Block,
return err
}

// Add the block to the block index. Ultimately the block index
// should track modified nodes and persist all of them prior
// this point as opposed to unconditionally peristing the node
// again. However, this is needed for now in lieu of that to
// ensure the updated status is written to the database.
err = dbPutBlockNode(dbTx, node)
if err != nil {
return err
}

// Update the utxo set using the state of the utxo view. This
// entails removing all of the utxos spent and adding the new
// ones created by the block.
Expand Down Expand Up @@ -912,6 +911,12 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block, parent *dcrutil.Blo
tip.height)
}

// Write any modified block index entries to the database before
// updating the best state.
if err := b.index.flush(); err != nil {
return err
}

// Generate a new best state snapshot that will be used to update the
// database and later memory if all database updates are successful.
b.stateLock.RLock()
Expand Down Expand Up @@ -1062,6 +1067,10 @@ func countNumberOfTransactions(block, parent *dcrutil.Block) uint64 {
// the chain) and nodes the are being attached must be in forwards order
// (think pushing them onto the end of the chain).
//
// This function may modify the validation state of nodes in the block index
// without flushing in the case the chain is not able to reorganize due to a
// block failing to connect.
//
// This function MUST be called with the chain state lock held (for writes).
func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error {
// Nothing to do if no reorganize nodes were provided.
Expand Down Expand Up @@ -1371,6 +1380,9 @@ func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error
// block hash requested, so long as it matches up with the current organization
// of the best chain.
//
// This function may modify the validation state of nodes in the block index
// without flushing.
//
// This function MUST be called with the chain state lock held (for writes).
func (b *BlockChain) forceHeadReorganization(formerBest chainhash.Hash, newBest chainhash.Hash) error {
if formerBest.IsEqual(&newBest) {
Expand Down Expand Up @@ -1464,15 +1476,35 @@ func (b *BlockChain) forceHeadReorganization(formerBest chainhash.Hash, newBest
b.index.SetStatusFlags(newBestNode, statusValid)
}

// Reorganize the chain and flush any potential unsaved changes to the
// block index to the database. It is safe to ignore any flushing
// errors here as the only time the index will be modified is if the
// block failed to connect.
attach, detach := b.getReorganizeNodes(newBestNode)
return b.reorganizeChain(attach, detach)
err := b.reorganizeChain(attach, detach)
b.flushBlockIndexWarnOnly()
return err
}

// ForceHeadReorganization is the exported version of forceHeadReorganization.
func (b *BlockChain) ForceHeadReorganization(formerBest chainhash.Hash, newBest chainhash.Hash) error {
b.chainLock.Lock()
defer b.chainLock.Unlock()
return b.forceHeadReorganization(formerBest, newBest)
err := b.forceHeadReorganization(formerBest, newBest)
b.chainLock.Unlock()
return err
}

// flushBlockIndexWarnOnly attempts to flush and modified block index nodes to
// the database and will log a warning if it fails.
//
// NOTE: This MUST only be used in the specific circumstances where failure to
// flush only results in a worst case scenario of requiring one or more blocks
// to be validated again. All other cases must directly call the function on
// the block index and check the error return accordingly.
func (b *BlockChain) flushBlockIndexWarnOnly() {
if err := b.index.flush(); err != nil {
log.Warnf("Unable to flush block index changes to db: %v", err)
}
}

// connectBestChain handles connecting the passed block to the chain while
Expand Down Expand Up @@ -1518,17 +1550,24 @@ func (b *BlockChain) connectBestChain(node *blockNode, block, parent *dcrutil.Bl
view.SetStakeViewpoint(ViewpointPrevValidInitial)
var stxos []spentTxOut
if !fastAdd {
// Validate the block and set the applicable status
// result in the block index.
// Validate the block, set the applicable status result
// in the block index, and flush the status changes to
// the database. It is safe to ignore any errors when
// flushing here as the changes will be flushed when a
// valid block is connected, and the worst case scenario
// if a block a invalid is it would need to be
// revalidated after a restart.
err := b.checkConnectBlock(node, block, parent, view,
&stxos)
if err != nil {
if _, ok := err.(RuleError); ok {
b.index.SetStatusFlags(node, statusValidateFailed)
b.flushBlockIndexWarnOnly()
}
return 0, err
}
b.index.SetStatusFlags(node, statusValid)
b.flushBlockIndexWarnOnly()
}

// In the fast add case the code to check the block connection
Expand Down Expand Up @@ -1598,9 +1637,13 @@ func (b *BlockChain) connectBestChain(node *blockNode, block, parent *dcrutil.Bl
// common ancenstor (the point where the chain forked).
detachNodes, attachNodes := b.getReorganizeNodes(node)

// Reorganize the chain.
// Reorganize the chain and flush any potential unsaved changes to the
// block index to the database. It is safe to ignore any flushing
// errors here as the only time the index will be modified is if the
// block failed to connect.
log.Infof("REORGANIZE: Block %v is causing a reorganize.", node.hash)
err := b.reorganizeChain(detachNodes, attachNodes)
b.flushBlockIndexWarnOnly()
if err != nil {
return 0, err
}
Expand Down
6 changes: 6 additions & 0 deletions blockchain/stakenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func (b *BlockChain) fetchStakeNode(node *blockNode) (*stake.Node, error) {
detachNodes, attachNodes := b.getReorganizeNodes(node)
current := b.bestChain.Tip()

// Flush any potential unsaved changes to the block index due to the
// call to get the reorganize nodes. Since the best state is not being
// being modified, it is safe to ignore any errors here as the changes
// will be flushed at that point and those errors are not ignored.
b.flushBlockIndexWarnOnly()

// Move backwards through the main chain, undoing the ticket
// treaps for each block. The database is passed because the
// undo data and new tickets data for each block may not yet
Expand Down
6 changes: 6 additions & 0 deletions blockchain/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2615,6 +2615,12 @@ func (b *BlockChain) CheckConnectBlockTemplate(block *dcrutil.Block) error {
// just before the requested node.
detachNodes, attachNodes := b.getReorganizeNodes(prevNode)

// Flush any potential unsaved changes to the block index due to the
// call to get the reorganize nodes. Since the best state is not being
// modified, it is safe to ignore any errors here as the changes will be
// flushed at that point and those errors are not ignored.
b.flushBlockIndexWarnOnly()

view := NewUtxoViewpoint()
view.SetBestHash(&tip.hash)
view.SetStakeViewpoint(ViewpointPrevValidInitial)
Expand Down