Skip to content

Commit

Permalink
Optimize fork sync targets handling
Browse files Browse the repository at this point in the history
  • Loading branch information
prybalko committed Aug 28, 2023
1 parent 48415ce commit 6221582
Showing 1 changed file with 67 additions and 43 deletions.
110 changes: 67 additions & 43 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use std::{
pin::Pin,
sync::Arc,
};
use std::collections::BTreeMap;

pub use service::chain_sync::SyncingService;

Expand Down Expand Up @@ -304,7 +305,7 @@ pub struct ChainSync<B: BlockT, Client> {
/// downloaded and are queued for import.
queue_blocks: HashSet<B::Hash>,
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
fork_targets: BTreeMap<(NumberFor<B>, B::Hash), ForkTarget<B>>,
/// A set of peers for which there might be potential block requests
allowed_requests: AllowedRequests,
/// A type to check incoming block announcements.
Expand Down Expand Up @@ -385,7 +386,6 @@ impl<B: BlockT> PeerSync<B> {
}

struct ForkTarget<B: BlockT> {
number: NumberFor<B>,
parent_hash: Option<B::Hash>,
peers: HashSet<PeerId>,
}
Expand Down Expand Up @@ -538,8 +538,7 @@ where

fn num_sync_requests(&self) -> usize {
self.fork_targets
.values()
.filter(|f| f.number <= self.best_queued_number)
.range(..(self.best_queued_number.saturating_add(One::one()), Default::default()))
.count()
}

Expand Down Expand Up @@ -739,8 +738,8 @@ where
}

self.fork_targets
.entry(*hash)
.or_insert_with(|| ForkTarget { number, peers: Default::default(), parent_hash: None })
.entry((number, *hash))
.or_insert_with(|| ForkTarget { peers: Default::default(), parent_hash: None })
.peers
.extend(peers);
}
Expand Down Expand Up @@ -919,9 +918,8 @@ where
who,
);
self.fork_targets
.entry(peer.best_hash)
.entry((peer.best_number, peer.best_hash))
.or_insert_with(|| ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
})
Expand Down Expand Up @@ -1552,7 +1550,7 @@ where
/// Updates our internal state for best queued block and then goes
/// through all peers to update our view of their state as well.
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if self.fork_targets.remove(hash).is_some() {
if self.fork_targets.remove(&(number, *hash)).is_some() {
trace!(target: "sync", "Completed fork sync {:?}", hash);
}
if let Some(gap_sync) = &mut self.gap_sync {
Expand Down Expand Up @@ -1727,7 +1725,7 @@ where
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
if let Some(target) = self.fork_targets.get_mut(&(number, hash)) {
target.peers.insert(who);
}
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
Expand All @@ -1753,9 +1751,8 @@ where
announce.summary(),
);
self.fork_targets
.entry(hash)
.entry((number, hash))
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
Expand Down Expand Up @@ -2984,54 +2981,54 @@ fn peer_gap_block_request<B: BlockT>(
/// Get pending fork sync targets for a peer.
fn fork_sync_request<B: BlockT>(
id: &PeerId,
targets: &mut HashMap<B::Hash, ForkTarget<B>>,
targets: &mut BTreeMap<(NumberFor<B>, B::Hash), ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
max_blocks_per_request: u32,
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
if r.number <= finalized {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
targets.retain(|(number, hash), _| {
if *number <= finalized {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, number);
return false
}
if check_block(hash) != BlockStatus::Unknown {
trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, r.number);
trace!(target: "sync", "Removed obsolete fork sync request {:?} (#{})", hash, number);
return false
}
true
});
for (hash, r) in targets {
// Download the fork only if it is behind or not too far ahead our tip of the chain
// Otherwise it should be downloaded in full sync mode.
let range = ..(
best_num.saturating_add(max_blocks_per_request.into()),
Default::default(),
);
// Iterate in reverse order to download target with the highest number first.
// Other targets might belong to the same fork, so we don't need to download them separately.
for ((number, hash), r) in targets.range(range).rev() {
if !r.peers.contains(&id) {
continue
}
// Download the fork only if it is behind or not too far ahead our tip of the chain
// Otherwise it should be downloaded in full sync mode.
if r.number <= best_num ||
(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
{
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
} else {
// request only single block
1
};
trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count);
return Some((
*hash,
BlockRequest::<B> {
id: 0,
fields: attributes,
from: FromBlock::Hash(*hash),
direction: Direction::Descending,
max: Some(count),
},
))
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
(*number - finalized).saturated_into::<u32>() // up to the last finalized block
} else {
trace!(target: "sync", "Fork too far in the future: {:?} (#{})", hash, r.number);
}
// request only single block
1
};
trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count);
return Some((
*hash,
BlockRequest::<B> {
id: 0,
fields: attributes,
from: FromBlock::Hash(*hash),
direction: Direction::Descending,
max: Some(count),
},
));
}
None
}
Expand Down Expand Up @@ -4188,4 +4185,31 @@ mod test {
sync.peer_disconnected(&peers[1]);
assert_eq!(sync.pending_responses.len(), 0);
}

#[test]
fn fork_sync_request_prefers_latest_fork() {
let peer_id = PeerId::random();
let mut targets = (0..200u8).fold(BTreeMap::new(), |mut sum, i| {
sum.insert(
(i.into(), Hash::random()),
ForkTarget::<Block> { parent_hash: None, peers: vec![peer_id].into_iter().collect() },
);
sum
});
let (_, block_request) = fork_sync_request(
&peer_id,
&mut targets,
100,
50,
BlockAttributes::BODY,
|_| BlockStatus::Unknown,
10,
).expect("should have block request");

// Find expected chosen target
let expected_number = 109; // 100 + 10 - 1
let ((_, expected_hash), _) = targets.range((expected_number, Default::default())..(expected_number+1, Default::default())).next().unwrap();
assert_eq!(block_request.from, FromBlock::Hash(*expected_hash));
assert_eq!(block_request.max, Some(59)); // 109 - 50
}
}

0 comments on commit 6221582

Please sign in to comment.