Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netsync: Track best known blocks per peer. #3443

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 158 additions & 57 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,26 @@ type Peer struct {
// longer useful or are otherwise being malicious.
numConsecutiveOrphanHeaders int32

lastAnnouncedBlock *chainhash.Hash
// These fields are used to track the best known block announced by the peer
// which in turn provides a means to discover which blocks are available to
// download from the peer.
//
// announcedOrphanBlock is the hash of the most recently announced block
// that did not connect to any headers known to the local chain at the time
// of the announcement. It is tracked because such announcements are
// typically for newly found blocks whose parent headers will eventually
// become known and therefore have a fairly good chance of becoming the
// block with the most cumulative proof of work that the peer has announced.
//
// bestAnnouncedBlock is the hash of the block with the most cumulative
// proof of work that the peer has announced that is also known to the local
// chain.
//
// bestAnnouncedWork is the cumulative proof of work for the associated best
// announced block hash.
announcedOrphanBlock *chainhash.Hash
bestAnnouncedBlock *chainhash.Hash
bestAnnouncedWork *uint256.Uint256
}

// NewPeer returns a new instance of a peer that wraps the provided underlying
Expand Down Expand Up @@ -649,6 +668,21 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) {

m.peers[peer] = struct{}{}

// Request headers starting from the parent of the best known header for the
// local chain immediately when the initial headers sync process is complete
// and the peer is a sync candidate.
//
// This primarily serves two purposes:
//
// 1) It immediately discovers any blocks that are not already known
// 2) It provides accurate discovery of the best known block of the peer
//
// Note that the parent is used because the request would otherwise result
// in an empty response when both the local and remote tips are the same.
if peer.syncCandidate && m.hdrSyncState.headersSynced {
m.fetchNextHeaders(peer)
}

// Start syncing by choosing the best candidate if needed.
if peer.syncCandidate && m.syncPeer == nil {
m.startSync()
Expand Down Expand Up @@ -891,6 +925,55 @@ func (m *SyncManager) maybeUpdateIsCurrent() {
}
}

// maybeUpdateBestAnnouncedBlock potentially updates the block with the most
// cumulative proof of work that the given peer has announced which includes its
// associated hash, cumulative work sum, and height.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeUpdateBestAnnouncedBlock(p *Peer, hash *chainhash.Hash, header *wire.BlockHeader) {
chain := m.cfg.Chain
workSum, err := chain.ChainWork(hash)
if err != nil {
return
}

// Update the best block and associated values when the cumulative work for
// given block exceeds that of the current best known block for the peer.
if p.bestAnnouncedWork == nil || workSum.Gt(p.bestAnnouncedWork) {
p.bestAnnouncedBlock = hash
p.bestAnnouncedWork = &workSum
p.UpdateLastBlockHeight(int64(header.Height))
}
}

// maybeResolveOrphanBlock potentially resolves the most recently announced
// block by the peer that did not connect to any headers known to the local
// chain at the time of the announcement by checking if it is now known and,
// when it is, potentially making it the block with the most cumulative proof of
// work announced by the peer if needed.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeResolveOrphanBlock(p *Peer) {
// Nothing to do if there isn't a pending orphan block announcement that has
// not yet been resolved or the block still isn't known.
chain := m.cfg.Chain
blockHash := p.announcedOrphanBlock
if blockHash == nil || !chain.HaveHeader(blockHash) {
return
}

// The block has now been resolved, so potentially make it the block with
// the most cumulative proof of work announced by the peer.
header, err := chain.HeaderByHash(blockHash)
if err != nil {
log.Warnf("Unable to retrieve known good header %s: %v", blockHash, err)
return
}
m.maybeUpdateBestAnnouncedBlock(p, blockHash, &header)
}

// processBlock processes the provided block using the internal chain instance.
//
// When no errors occurred during processing, the first return value indicates
Expand Down Expand Up @@ -1057,30 +1140,6 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
m.cfg.MixPool.ExpireMessagesInBackground(header.Height)
}

// Update the latest block height for the peer to avoid stale heights when
// looking for future potential sync node candidacy.
//
// Also, when the chain is considered current and the block was accepted to
// the main chain, update the heights of other peers whose invs may have
// been ignored when actively syncing while the chain was not yet current or
// lost the lock announcement race.
blockHeight := int64(header.Height)
peer.UpdateLastBlockHeight(blockHeight)
if onMainChain && m.IsCurrent() {
for p := range m.peers {
// The height for the sending peer is already updated.
if p == peer {
continue
}

lastAnnBlock := p.lastAnnouncedBlock
if lastAnnBlock != nil && *lastAnnBlock == *blockHash {
p.UpdateLastBlockHeight(blockHeight)
p.lastAnnouncedBlock = nil
}
}
}

// Request more blocks using the headers when the request queue is getting
// short.
if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks {
Expand Down Expand Up @@ -1167,45 +1226,66 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock)
headersSynced := m.hdrSyncState.headersSynced
if !firstHeaderConnects {
// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
if !headersSynced {
return
}

// Attempt to detect block announcements which do not connect to any
// known headers and request any headers starting from the best header
// the local chain knows in order to (hopefully) discover the missing
// headers.
// headers unless the initial headers sync process is still in progress.
//
// Meanwhile, also keep track of how many times the peer has
// consecutively sent a headers message that does not connect and
// disconnect it once the max allowed threshold has been reached.
// consecutively sent a headers message that looks like an announcement
// that does not connect and disconnect it once the max allowed
// threshold has been reached.
if numHeaders < maxExpectedHeaderAnnouncementsPerMsg {
peer.numConsecutiveOrphanHeaders++
if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders {
log.Debugf("Received %d consecutive headers messages that do "+
"not connect from peer %s -- disconnecting",
peer.numConsecutiveOrphanHeaders, peer)
peer.Disconnect()
return
}

log.Debugf("Requesting missing parents for header %s (height %d) "+
"received from peer %s", firstHeaderHash, firstHeader.Height,
peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
if headersSynced {
log.Debugf("Requesting missing parents for header %s (height "+
"%d) received from peer %s", firstHeaderHash,
firstHeader.Height, peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
}

// Track the final announced header as the most recently announced
// block by the peer that does not connect to any headers known to
// the local chain since there is a good chance it will eventually
// become known either from this peer or others.
m.maybeResolveOrphanBlock(peer)
finalHeader := headers[len(headers)-1]
finalHeaderHash := finalHeader.BlockHash()
peer.announcedOrphanBlock = &finalHeaderHash

// Update the latest block height for the peer to avoid stale
// heights when looking for future potential header sync node
// candidacy when the initial headers sync process is still in
// progess.
if !headersSynced {
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
}
return
}

// The initial headers sync process is done and this does not appear to
// be a block announcement, so disconnect the peer.
log.Debugf("Received orphan header from peer %s -- disconnecting", peer)
peer.Disconnect()
// Disconnect the peer when the initial headers sync process is done and
// this does not appear to be a block announcement.
if headersSynced {
log.Debugf("Received orphan header from peer %s -- disconnecting",
peer)
peer.Disconnect()
return
}

// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
return
}

Expand Down Expand Up @@ -1273,12 +1353,13 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// of the provided headers are successfully processed above.
peer.numConsecutiveOrphanHeaders = 0

// Update the last announced block to the final one in the announced headers
// above and update the height for the peer too.
// Potentially resolve a previously unknown announced block and then update
// the block with the most cumulative proof of work the peer has announced
// to the final announced header if needed.
finalHeader := headers[len(headers)-1]
finalReceivedHash := &headerHashes[len(headerHashes)-1]
peer.lastAnnouncedBlock = finalReceivedHash
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, finalReceivedHash, finalHeader)

// Update the sync height if the new best known header height exceeds it.
syncHeight := m.SyncHeight()
Expand Down Expand Up @@ -1335,6 +1416,18 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Info("Syncing chain")
m.progressLogger.SetLastLogTime(time.Now())

// Request headers starting from the parent of the best known header
// for the local chain from any sync candidates that have not yet
// had their best known block discovered now that the initial
// headers sync process is complete.
for peer := range m.peers {
m.maybeResolveOrphanBlock(peer)
Comment on lines +1423 to +1424
Copy link
Member Author

@davecgh davecgh Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm completely new to this code but from a purely logical perspective it seems like it would be sensible to call maybeResolveOrphanBlock on all peers in onInitialChainSyncDone. Once the sync is known/suspected to be complete, it should be possible to resolve all orphan blocks announced by peers (if they are part of the best chain).

EDIT: Now that I am looking further, perhaps it doesn't matter because the announcedOrphanBlock field is only used before sync is completed, so updating it upon completion is unnecessary?

It's probably not entirely clear from this PR alone since nothing actually uses it yet, but ultimately it applies both before and after the initial chain sync is completed. However, it isn't needed in onInitialchainSyncDone because of how the overall sync works.

For a quick recap, there are 2 phases. The first phase is the initial headers sync and the second phase is the chain (block) sync. onInitialChainSyncDone happens at the end of the second phase.

The first phase is where you're realistically going to see all orphans barring some kind of chain split, because you don't know all the headers yet, so any block announcement headers will necessarily not be able to connect and thus be orphans.

However, in the second phase, all headers are known (or at least believed to be known) and thus any announcements for new blocks that show up while the chain sync is taking place will realistically be known. In the event they aren't known, attempting to resolve them once all the blocks are downloaded and linked will not gain you anything because there are no new headers involved.

Also, once phase one is done, new headers continue to be added and resolved in parallel with the chain (block) sync.

Notice that the code I'm commenting on resolves the orphans once the initial headers sync is done which is likely what you were mostly thinking about with your keen observation.

if !peer.syncCandidate || peer.bestAnnouncedBlock != nil {
continue
}
m.fetchNextHeaders(peer)
}

// Potentially update whether the chain believes it is current now
// that the headers are synced.
chain.MaybeUpdateIsCurrent()
Expand Down Expand Up @@ -1534,15 +1627,23 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
}

if lastBlock != nil {
// Update the last announced block to the final one in the announced
// inventory above (if any). In the case the header for that block is
// already known, use that information to update the height for the peer
// too.
peer.lastAnnouncedBlock = &lastBlock.Hash
if isCurrent {
// Determine if the final announced block is already known to the local
// chain and then either track it as the most recently announced
// block by the peer that does not connect to any headers known to the
// local chain or potentially make it the block with the most cumulative
// proof of work announced by the peer when it is already known.
if !m.cfg.Chain.HaveHeader(&lastBlock.Hash) {
// Notice a copy of the hash is made here to avoid keeping a
// reference into the inventory vector which would prevent it from
// being GCd.
lastBlockHash := lastBlock.Hash
m.maybeResolveOrphanBlock(peer)
peer.announcedOrphanBlock = &lastBlockHash
} else {
header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash)
if err == nil {
peer.UpdateLastBlockHeight(int64(header.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, &lastBlock.Hash, &header)
}
}
}
Expand Down