diff --git a/internal/netsync/manager.go b/internal/netsync/manager.go index d13a1267f..27188df5a 100644 --- a/internal/netsync/manager.go +++ b/internal/netsync/manager.go @@ -223,7 +223,26 @@ type Peer struct { // longer useful or are otherwise being malicious. numConsecutiveOrphanHeaders int32 - lastAnnouncedBlock *chainhash.Hash + // These fields are used to track the best known block announced by the peer + // which in turn provides a means to discover which blocks are available to + // download from the peer. + // + // announcedOrphanBlock is the hash of the most recently announced block + // that did not connect to any headers known to the local chain at the time + // of the announcement. It is tracked because such announcements are + // typically for newly found blocks whose parent headers will eventually + // become known and therefore have a fairly good chance of becoming the + // block with the most cumulative proof of work that the peer has announced. + // + // bestAnnouncedBlock is the hash of the block with the most cumulative + // proof of work that the peer has announced that is also known to the local + // chain. + // + // bestAnnouncedWork is the cumulative proof of work for the associated best + // announced block hash. + announcedOrphanBlock *chainhash.Hash + bestAnnouncedBlock *chainhash.Hash + bestAnnouncedWork *uint256.Uint256 } // NewPeer returns a new instance of a peer that wraps the provided underlying @@ -649,8 +668,23 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) { m.peers[peer] = struct{}{} + // Request headers starting from the parent of the best known header for the + // local chain immediately when the initial headers sync process is complete + // and the peer is a sync candidate. + // + // This primarily serves two purposes: + // + // 1) It immediately discovers any blocks that are not already known + // 2) It provides accurate discovery of the best known block of the peer + // + // Note that the parent is used because the request would otherwise result + // in an empty response when both the local and remote tips are the same. + if peer.syncCandidate && m.hdrSyncState.headersSynced { + m.fetchNextHeaders(peer) + } + // Start syncing by choosing the best candidate if needed. - if peer.syncCandidate && m.syncPeer == nil { + if m.syncPeer == nil { m.startSync() } @@ -891,6 +925,55 @@ func (m *SyncManager) maybeUpdateIsCurrent() { } } +// maybeUpdateBestAnnouncedBlock potentially updates the block with the most +// cumulative proof of work that the given peer has announced which includes its +// associated hash, cumulative work sum, and height. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) maybeUpdateBestAnnouncedBlock(p *Peer, hash *chainhash.Hash, header *wire.BlockHeader) { + chain := m.cfg.Chain + workSum, err := chain.ChainWork(hash) + if err != nil { + return + } + + // Update the best block and associated values when the cumulative work for + // given block exceeds that of the current best known block for the peer. + if p.bestAnnouncedWork == nil || workSum.Gt(p.bestAnnouncedWork) { + p.bestAnnouncedBlock = hash + p.bestAnnouncedWork = &workSum + p.UpdateLastBlockHeight(int64(header.Height)) + } +} + +// maybeResolveOrphanBlock potentially resolves the most recently announced +// block by the peer that did not connect to any headers known to the local +// chain at the time of the announcement by checking if it is now known and, +// when it is, potentially making it the block with the most cumulative proof of +// work announced by the peer if needed. +// +// This function is NOT safe for concurrent access. It must be called from the +// event handler goroutine. +func (m *SyncManager) maybeResolveOrphanBlock(p *Peer) { + // Nothing to do if there isn't a pending orphan block announcement that has + // not yet been resolved or the block still isn't known. + chain := m.cfg.Chain + blockHash := p.announcedOrphanBlock + if blockHash == nil || !chain.HaveHeader(blockHash) { + return + } + + // The block has now been resolved, so potentially make it the block with + // the most cumulative proof of work announced by the peer. + header, err := chain.HeaderByHash(blockHash) + if err != nil { + log.Warnf("Unable to retrieve known good header %s: %v", blockHash, err) + return + } + m.maybeUpdateBestAnnouncedBlock(p, blockHash, &header) +} + // processBlock processes the provided block using the internal chain instance. // // When no errors occurred during processing, the first return value indicates @@ -1057,30 +1140,6 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) { m.cfg.MixPool.ExpireMessagesInBackground(header.Height) } - // Update the latest block height for the peer to avoid stale heights when - // looking for future potential sync node candidacy. - // - // Also, when the chain is considered current and the block was accepted to - // the main chain, update the heights of other peers whose invs may have - // been ignored when actively syncing while the chain was not yet current or - // lost the lock announcement race. - blockHeight := int64(header.Height) - peer.UpdateLastBlockHeight(blockHeight) - if onMainChain && m.IsCurrent() { - for p := range m.peers { - // The height for the sending peer is already updated. - if p == peer { - continue - } - - lastAnnBlock := p.lastAnnouncedBlock - if lastAnnBlock != nil && *lastAnnBlock == *blockHash { - p.UpdateLastBlockHeight(blockHeight) - p.lastAnnouncedBlock = nil - } - } - } - // Request more blocks using the headers when the request queue is getting // short. if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks { @@ -1167,21 +1226,15 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock) headersSynced := m.hdrSyncState.headersSynced if !firstHeaderConnects { - // Ignore headers that do not connect to any known headers when the - // initial headers sync is taking place. It is expected that headers - // will be announced that are not yet known. - if !headersSynced { - return - } - // Attempt to detect block announcements which do not connect to any // known headers and request any headers starting from the best header // the local chain knows in order to (hopefully) discover the missing - // headers. + // headers unless the initial headers sync process is still in progress. // // Meanwhile, also keep track of how many times the peer has - // consecutively sent a headers message that does not connect and - // disconnect it once the max allowed threshold has been reached. + // consecutively sent a headers message that looks like an announcement + // that does not connect and disconnect it once the max allowed + // threshold has been reached. if numHeaders < maxExpectedHeaderAnnouncementsPerMsg { peer.numConsecutiveOrphanHeaders++ if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders { @@ -1189,23 +1242,50 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { "not connect from peer %s -- disconnecting", peer.numConsecutiveOrphanHeaders, peer) peer.Disconnect() + return } - log.Debugf("Requesting missing parents for header %s (height %d) "+ - "received from peer %s", firstHeaderHash, firstHeader.Height, - peer) - bestHeaderHash, _ := chain.BestHeader() - blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) - locator := chainBlockLocatorToHashes(blkLocator) - peer.PushGetHeadersMsg(locator, &zeroHash) + if headersSynced { + log.Debugf("Requesting missing parents for header %s (height "+ + "%d) received from peer %s", firstHeaderHash, + firstHeader.Height, peer) + bestHeaderHash, _ := chain.BestHeader() + blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash) + locator := chainBlockLocatorToHashes(blkLocator) + peer.PushGetHeadersMsg(locator, &zeroHash) + } + // Track the final announced header as the most recently announced + // block by the peer that does not connect to any headers known to + // the local chain since there is a good chance it will eventually + // become known either from this peer or others. + m.maybeResolveOrphanBlock(peer) + finalHeader := headers[len(headers)-1] + finalHeaderHash := finalHeader.BlockHash() + peer.announcedOrphanBlock = &finalHeaderHash + + // Update the latest block height for the peer to avoid stale + // heights when looking for future potential header sync node + // candidacy when the initial headers sync process is still in + // progess. + if !headersSynced { + peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + } return } - // The initial headers sync process is done and this does not appear to - // be a block announcement, so disconnect the peer. - log.Debugf("Received orphan header from peer %s -- disconnecting", peer) - peer.Disconnect() + // Disconnect the peer when the initial headers sync process is done and + // this does not appear to be a block announcement. + if headersSynced { + log.Debugf("Received orphan header from peer %s -- disconnecting", + peer) + peer.Disconnect() + return + } + + // Ignore headers that do not connect to any known headers when the + // initial headers sync is taking place. It is expected that headers + // will be announced that are not yet known. return } @@ -1273,12 +1353,13 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // of the provided headers are successfully processed above. peer.numConsecutiveOrphanHeaders = 0 - // Update the last announced block to the final one in the announced headers - // above and update the height for the peer too. + // Potentially resolve a previously unknown announced block and then update + // the block with the most cumulative proof of work the peer has announced + // to the final announced header if needed. finalHeader := headers[len(headers)-1] finalReceivedHash := &headerHashes[len(headerHashes)-1] - peer.lastAnnouncedBlock = finalReceivedHash - peer.UpdateLastBlockHeight(int64(finalHeader.Height)) + m.maybeResolveOrphanBlock(peer) + m.maybeUpdateBestAnnouncedBlock(peer, finalReceivedHash, finalHeader) // Update the sync height if the new best known header height exceeds it. syncHeight := m.SyncHeight() @@ -1335,6 +1416,18 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Info("Syncing chain") m.progressLogger.SetLastLogTime(time.Now()) + // Request headers starting from the parent of the best known header + // for the local chain from any sync candidates that have not yet + // had their best known block discovered now that the initial + // headers sync process is complete. + for peer := range m.peers { + m.maybeResolveOrphanBlock(peer) + if !peer.syncCandidate || peer.bestAnnouncedBlock != nil { + continue + } + m.fetchNextHeaders(peer) + } + // Potentially update whether the chain believes it is current now // that the headers are synced. chain.MaybeUpdateIsCurrent() @@ -1534,15 +1627,23 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) { } if lastBlock != nil { - // Update the last announced block to the final one in the announced - // inventory above (if any). In the case the header for that block is - // already known, use that information to update the height for the peer - // too. - peer.lastAnnouncedBlock = &lastBlock.Hash - if isCurrent { + // Determine if the final announced block is already known to the local + // chain and then either track it as the most recently announced + // block by the peer that does not connect to any headers known to the + // local chain or potentially make it the block with the most cumulative + // proof of work announced by the peer when it is already known. + if !m.cfg.Chain.HaveHeader(&lastBlock.Hash) { + // Notice a copy of the hash is made here to avoid keeping a + // reference into the inventory vector which would prevent it from + // being GCd. + lastBlockHash := lastBlock.Hash + m.maybeResolveOrphanBlock(peer) + peer.announcedOrphanBlock = &lastBlockHash + } else { header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash) if err == nil { - peer.UpdateLastBlockHeight(int64(header.Height)) + m.maybeResolveOrphanBlock(peer) + m.maybeUpdateBestAnnouncedBlock(peer, &lastBlock.Hash, &header) } } }