diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs
index dd0bd8581985..7b383e8034a7 100644
--- a/polkadot/node/core/pvf/src/host.rs
+++ b/polkadot/node/core/pvf/src/host.rs
@@ -29,12 +29,11 @@ use crate::{
use always_assert::never;
use futures::{
channel::{mpsc, oneshot},
- join, Future, FutureExt, SinkExt, StreamExt,
+ Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
pvf::PvfPrepData,
- SecurityStatus,
};
use polkadot_parachain_primitives::primitives::ValidationResult;
use std::{
@@ -208,21 +207,7 @@ pub async fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Fu
gum::debug!(target: LOG_TARGET, ?config, "starting PVF validation host");
// Run checks for supported security features once per host startup. Warn here if not enabled.
- let security_status = {
- // TODO: add check that syslog is available and that seccomp violations are logged?
- let (can_enable_landlock, can_enable_seccomp, can_unshare_user_namespace_and_change_root) = join!(
- security::check_landlock(&config.prepare_worker_program_path),
- security::check_seccomp(&config.prepare_worker_program_path),
- security::check_can_unshare_user_namespace_and_change_root(
- &config.prepare_worker_program_path
- )
- );
- SecurityStatus {
- can_enable_landlock,
- can_enable_seccomp,
- can_unshare_user_namespace_and_change_root,
- }
- };
+ let security_status = security::check_security_status(&config).await;
let (to_host_tx, to_host_rx) = mpsc::channel(10);
diff --git a/polkadot/node/core/pvf/src/security.rs b/polkadot/node/core/pvf/src/security.rs
index decd321e415e..295dd7df94dd 100644
--- a/polkadot/node/core/pvf/src/security.rs
+++ b/polkadot/node/core/pvf/src/security.rs
@@ -14,22 +14,142 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
-use crate::LOG_TARGET;
-use std::path::Path;
+use crate::{Config, SecurityStatus, LOG_TARGET};
+use futures::join;
+use std::{fmt, path::Path};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
};
-/// Check if we can sandbox the root and emit a warning if not.
+const SECURE_MODE_ANNOUNCEMENT: &'static str =
+ "In the next release this will be a hard error by default.
+ \nMore information: https://wiki.polkadot.network/docs/maintain-guides-secure-validator#secure-validator-mode";
+
+/// Run checks for supported security features.
+pub async fn check_security_status(config: &Config) -> SecurityStatus {
+ let Config { prepare_worker_program_path, .. } = config;
+
+ // TODO: add check that syslog is available and that seccomp violations are logged?
+ let (landlock, seccomp, change_root) = join!(
+ check_landlock(prepare_worker_program_path),
+ check_seccomp(prepare_worker_program_path),
+ check_can_unshare_user_namespace_and_change_root(prepare_worker_program_path)
+ );
+
+ let security_status = SecurityStatus {
+ can_enable_landlock: landlock.is_ok(),
+ can_enable_seccomp: seccomp.is_ok(),
+ can_unshare_user_namespace_and_change_root: change_root.is_ok(),
+ };
+
+ let errs: Vec = [landlock, seccomp, change_root]
+ .into_iter()
+ .filter_map(|result| result.err())
+ .collect();
+ let err_occurred = print_secure_mode_message(errs);
+ if err_occurred {
+ gum::error!(
+ target: LOG_TARGET,
+ "{}",
+ SECURE_MODE_ANNOUNCEMENT,
+ );
+ }
+
+ security_status
+}
+
+type SecureModeResult = std::result::Result<(), SecureModeError>;
+
+/// Errors related to enabling Secure Validator Mode.
+#[derive(Debug)]
+enum SecureModeError {
+ CannotEnableLandlock(String),
+ CannotEnableSeccomp(String),
+ CannotUnshareUserNamespaceAndChangeRoot(String),
+}
+
+impl SecureModeError {
+ /// Whether this error is allowed with Secure Validator Mode enabled.
+ fn is_allowed_in_secure_mode(&self) -> bool {
+ use SecureModeError::*;
+ match self {
+ CannotEnableLandlock(_) => true,
+ CannotEnableSeccomp(_) => false,
+ CannotUnshareUserNamespaceAndChangeRoot(_) => false,
+ }
+ }
+}
+
+impl fmt::Display for SecureModeError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ use SecureModeError::*;
+ match self {
+ CannotEnableLandlock(err) => write!(f, "Cannot enable landlock, a Linux 5.13+ kernel security feature: {err}"),
+ CannotEnableSeccomp(err) => write!(f, "Cannot enable seccomp, a Linux-specific kernel security feature: {err}"),
+ CannotUnshareUserNamespaceAndChangeRoot(err) => write!(f, "Cannot unshare user namespace and change root, which are Linux-specific kernel security features: {err}"),
+ }
+ }
+}
+
+/// Errors if Secure Validator Mode and some mandatory errors occurred, warn otherwise.
+///
+/// # Returns
+///
+/// `true` if an error was printed, `false` otherwise.
+fn print_secure_mode_message(errs: Vec) -> bool {
+ // Trying to run securely and some mandatory errors occurred.
+ const SECURE_MODE_ERROR: &'static str = "🚨 Your system cannot securely run a validator. \
+ \nRunning validation of malicious PVF code has a higher risk of compromising this machine.";
+ // Some errors occurred when running insecurely, or some optional errors occurred when running
+ // securely.
+ const SECURE_MODE_WARNING: &'static str = "🚨 Some security issues have been detected. \
+ \nRunning validation of malicious PVF code has a higher risk of compromising this machine.";
+
+ if errs.is_empty() {
+ return false
+ }
+
+ let errs_allowed = errs.iter().all(|err| err.is_allowed_in_secure_mode());
+ let errs_string: String = errs
+ .iter()
+ .map(|err| {
+ format!(
+ "\n - {}{}",
+ if err.is_allowed_in_secure_mode() { "Optional: " } else { "" },
+ err
+ )
+ })
+ .collect();
+
+ if errs_allowed {
+ gum::warn!(
+ target: LOG_TARGET,
+ "{}{}",
+ SECURE_MODE_WARNING,
+ errs_string,
+ );
+ false
+ } else {
+ gum::error!(
+ target: LOG_TARGET,
+ "{}{}",
+ SECURE_MODE_ERROR,
+ errs_string,
+ );
+ true
+ }
+}
+
+/// Check if we can change root to a new, sandboxed root and return an error if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
-pub async fn check_can_unshare_user_namespace_and_change_root(
+async fn check_can_unshare_user_namespace_and_change_root(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
-) -> bool {
+) -> SecureModeResult {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
match tokio::process::Command::new(prepare_worker_program_path)
@@ -37,50 +157,37 @@ pub async fn check_can_unshare_user_namespace_and_change_root(
.output()
.await
{
- Ok(output) if output.status.success() => true,
+ Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let stderr = std::str::from_utf8(&output.stderr)
.expect("child process writes a UTF-8 string to stderr; qed")
.trim();
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- // Docs say to always print status using `Display` implementation.
- status = %output.status,
- %stderr,
- "Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running with support for unsharing user namespaces for maximum security."
- );
- false
- },
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- "Could not start child process: {}",
- err
- );
- false
+ Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
+ format!("not available: {}", stderr)
+ ))
},
+ Err(err) =>
+ Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
+ format!("could not start child process: {}", err)
+ )),
}
} else {
- gum::warn!(
- target: LOG_TARGET,
- "Cannot unshare user namespace and change root, which are Linux-specific kernel security features. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with support for unsharing user namespaces for maximum security."
- );
- false
+ Err(SecureModeError::CannotUnshareUserNamespaceAndChangeRoot(
+ "only available on Linux".into()
+ ))
}
}
}
-/// Check if landlock is supported and emit a warning if not.
+/// Check if landlock is supported and return an error if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
-pub async fn check_landlock(
+async fn check_landlock(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
-) -> bool {
+) -> SecureModeResult {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
match tokio::process::Command::new(prepare_worker_program_path)
@@ -88,81 +195,73 @@ pub async fn check_landlock(
.status()
.await
{
- Ok(status) if status.success() => true,
- Ok(status) => {
+ Ok(status) if status.success() => Ok(()),
+ Ok(_status) => {
let abi =
polkadot_node_core_pvf_common::worker::security::landlock::LANDLOCK_ABI as u8;
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- ?status,
- %abi,
- "Cannot fully enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security."
- );
- false
- },
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- "Could not start child process: {}",
- err
- );
- false
+ Err(SecureModeError::CannotEnableLandlock(
+ format!("landlock ABI {} not available", abi)
+ ))
},
+ Err(err) =>
+ Err(SecureModeError::CannotEnableLandlock(
+ format!("could not start child process: {}", err)
+ )),
}
} else {
- gum::warn!(
- target: LOG_TARGET,
- "Cannot enable landlock, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with landlock support for maximum security."
- );
- false
+ Err(SecureModeError::CannotEnableLandlock(
+ "only available on Linux".into()
+ ))
}
}
}
-/// Check if seccomp is supported and emit a warning if not.
+/// Check if seccomp is supported and return an error if not.
///
/// We do this check by spawning a new process and trying to sandbox it. To get as close as possible
/// to running the check in a worker, we try it... in a worker. The expected return status is 0 on
/// success and -1 on failure.
-pub async fn check_seccomp(
+async fn check_seccomp(
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
prepare_worker_program_path: &Path,
-) -> bool {
+) -> SecureModeResult {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
- match tokio::process::Command::new(prepare_worker_program_path)
- .arg("--check-can-enable-seccomp")
- .status()
- .await
- {
- Ok(status) if status.success() => true,
- Ok(status) => {
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- ?status,
- "Cannot fully enable seccomp, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider upgrading the kernel version for maximum security."
- );
- false
- },
- Err(err) => {
- gum::warn!(
- target: LOG_TARGET,
- ?prepare_worker_program_path,
- "Could not start child process: {}",
- err
- );
- false
- },
+ cfg_if::cfg_if! {
+ if #[cfg(target_arch = "x86_64")] {
+ match tokio::process::Command::new(prepare_worker_program_path)
+ .arg("--check-can-enable-seccomp")
+ .status()
+ .await
+ {
+ Ok(status) if status.success() => Ok(()),
+ Ok(_status) =>
+ Err(SecureModeError::CannotEnableSeccomp(
+ "not available".into()
+ )),
+ Err(err) =>
+ Err(SecureModeError::CannotEnableSeccomp(
+ format!("could not start child process: {}", err)
+ )),
+ }
+ } else {
+ Err(SecureModeError::CannotEnableSeccomp(
+ "only supported on CPUs from the x86_64 family (usually Intel or AMD)".into()
+ ))
+ }
}
} else {
- gum::warn!(
- target: LOG_TARGET,
- "Cannot enable seccomp, a Linux-specific kernel security feature. Running validation of malicious PVF code has a higher risk of compromising this machine. Consider running on Linux with seccomp support for maximum security."
- );
- false
+ cfg_if::cfg_if! {
+ if #[cfg(target_arch = "x86_64")] {
+ Err(SecureModeError::CannotEnableSeccomp(
+ "only supported on Linux".into()
+ ))
+ } else {
+ Err(SecureModeError::CannotEnableSeccomp(
+ "only supported on Linux and on CPUs from the x86_64 family (usually Intel or AMD).".into()
+ ))
+ }
+ }
}
}
}
diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs
index 858125f93f1f..2adc6d423415 100644
--- a/substrate/client/network/sync/src/chain_sync.rs
+++ b/substrate/client/network/sync/src/chain_sync.rs
@@ -184,90 +184,26 @@ struct GapSync {
target: NumberFor,
}
-/// Action that the parent of [`ChainSync`] should perform after reporting imported blocks with
-/// [`ChainSync::on_blocks_processed`].
-pub enum BlockRequestAction {
+/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event.
+#[derive(Debug)]
+pub enum ChainSyncAction {
/// Send block request to peer. Always implies dropping a stale block request to the same peer.
- SendRequest { peer_id: PeerId, request: BlockRequest },
+ SendBlockRequest { peer_id: PeerId, request: BlockRequest },
/// Drop stale block request.
- RemoveStale { peer_id: PeerId },
-}
-
-/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct ImportBlocksAction {
- pub origin: BlockOrigin,
- pub blocks: Vec>,
-}
-
-/// Action that the parent of [`ChainSync`] should perform if we want to import justifications.
-pub struct ImportJustificationsAction {
- pub peer_id: PeerId,
- pub hash: B::Hash,
- pub number: NumberFor,
- pub justifications: Justifications,
-}
-
-/// Result of [`ChainSync::on_block_data`].
-#[derive(Debug, Clone, PartialEq, Eq)]
-enum OnBlockData {
- /// The block should be imported.
- Import(ImportBlocksAction),
- /// A new block request needs to be made to the given peer.
- Request(PeerId, BlockRequest),
- /// Continue processing events.
- Continue,
-}
-
-/// Result of [`ChainSync::on_block_justification`].
-#[derive(Debug, Clone, PartialEq, Eq)]
-enum OnBlockJustification {
- /// The justification needs no further handling.
- Nothing,
- /// The justification should be imported.
- Import {
+ CancelBlockRequest { peer_id: PeerId },
+ /// Peer misbehaved. Disconnect, report it and cancel the block request to it.
+ DropPeer(BadPeer),
+ /// Import blocks.
+ ImportBlocks { origin: BlockOrigin, blocks: Vec> },
+ /// Import justifications.
+ ImportJustifications {
peer_id: PeerId,
- hash: Block::Hash,
- number: NumberFor,
+ hash: B::Hash,
+ number: NumberFor,
justifications: Justifications,
},
}
-// Result of [`ChainSync::on_state_data`].
-#[derive(Debug)]
-enum OnStateData {
- /// The block and state that should be imported.
- Import(BlockOrigin, IncomingBlock),
- /// A new state request needs to be made to the given peer.
- Continue,
-}
-
-/// Action that the parent of [`ChainSync`] should perform after reporting block response with
-/// [`ChainSync::on_block_response`].
-pub enum OnBlockResponse {
- /// Nothing to do.
- Nothing,
- /// Perform block request.
- SendBlockRequest { peer_id: PeerId, request: BlockRequest },
- /// Import blocks.
- ImportBlocks(ImportBlocksAction),
- /// Import justifications.
- ImportJustifications(ImportJustificationsAction),
- /// Invalid block response, the peer should be disconnected and reported.
- DisconnectPeer(BadPeer),
-}
-
-/// Action that the parent of [`ChainSync`] should perform after reporting state response with
-/// [`ChainSync::on_state_response`].
-pub enum OnStateResponse {
- /// Nothing to do.
- Nothing,
- /// Import blocks.
- ImportBlocks(ImportBlocksAction),
- /// Invalid state response, the peer should be disconnected and reported.
- DisconnectPeer(BadPeer),
-}
-
/// The main data structure which contains all the state for a chains
/// active syncing strategy.
pub struct ChainSync {
@@ -313,6 +249,8 @@ pub struct ChainSync {
import_existing: bool,
/// Gap download process.
gap_sync: Option>,
+ /// Pending actions.
+ actions: Vec>,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -427,6 +365,7 @@ where
gap_sync: None,
warp_sync_config,
warp_sync_target_block_header: None,
+ actions: Vec::new(),
};
sync.reset_sync_start_point()?;
@@ -509,8 +448,17 @@ where
}
/// Notify syncing state machine that a new sync peer has connected.
+ pub fn new_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) {
+ match self.new_peer_inner(peer_id, best_hash, best_number) {
+ Ok(Some(request)) =>
+ self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }),
+ Ok(None) => {},
+ Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)),
+ }
+ }
+
#[must_use]
- pub fn new_peer(
+ fn new_peer_inner(
&mut self,
peer_id: PeerId,
best_hash: B::Hash,
@@ -727,7 +675,7 @@ where
peer_id: &PeerId,
request: Option>,
response: BlockResponse,
- ) -> Result, BadPeer> {
+ ) -> Result<(), BadPeer> {
self.downloaded_blocks += response.blocks.len();
let mut gap = false;
let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(peer_id) {
@@ -892,10 +840,12 @@ where
start: *start,
state: next_state,
};
- return Ok(OnBlockData::Request(
- *peer_id,
- ancestry_request::(next_num),
- ))
+ let request = ancestry_request::(next_num);
+ self.actions.push(ChainSyncAction::SendBlockRequest {
+ peer_id: *peer_id,
+ request,
+ });
+ return Ok(())
} else {
// Ancestry search is complete. Check if peer is on a stale fork unknown
// to us and add it to sync targets if necessary.
@@ -929,7 +879,7 @@ where
.insert(*peer_id);
}
peer.state = PeerSyncState::Available;
- Vec::new()
+ return Ok(())
}
},
PeerSyncState::DownloadingWarpTargetBlock => {
@@ -940,8 +890,7 @@ where
match warp_sync.import_target_block(
blocks.pop().expect("`blocks` len checked above."),
) {
- warp::TargetBlockImportResult::Success =>
- return Ok(OnBlockData::Continue),
+ warp::TargetBlockImportResult::Success => return Ok(()),
warp::TargetBlockImportResult::BadResponse =>
return Err(BadPeer(*peer_id, rep::VERIFICATION_FAIL)),
}
@@ -963,7 +912,7 @@ where
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
peer_id,
);
- return Ok(OnBlockData::Continue)
+ return Ok(())
}
},
PeerSyncState::Available |
@@ -1000,7 +949,9 @@ where
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
};
- Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap)))
+ self.validate_and_queue_blocks(new_blocks, gap);
+
+ Ok(())
}
/// Submit a justification response for processing.
@@ -1009,7 +960,7 @@ where
&mut self,
peer_id: PeerId,
response: BlockResponse,
- ) -> Result, BadPeer> {
+ ) -> Result<(), BadPeer> {
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
peer
} else {
@@ -1017,7 +968,7 @@ where
target: LOG_TARGET,
"💔 Called on_block_justification with a peer ID of an unknown peer",
);
- return Ok(OnBlockJustification::Nothing)
+ return Ok(())
};
self.allowed_requests.add(&peer_id);
@@ -1054,11 +1005,17 @@ where
if let Some((peer_id, hash, number, justifications)) =
self.extra_justifications.on_response(peer_id, justification)
{
- return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications })
+ self.actions.push(ChainSyncAction::ImportJustifications {
+ peer_id,
+ hash,
+ number,
+ justifications,
+ });
+ return Ok(())
}
}
- Ok(OnBlockJustification::Nothing)
+ Ok(())
}
/// Report a justification import (successful or not).
@@ -1196,8 +1153,7 @@ where
}
/// Notify that a sync peer has disconnected.
- #[must_use]
- pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option> {
+ pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.blocks.clear_peer_download(peer_id);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(peer_id)
@@ -1212,7 +1168,9 @@ where
let blocks = self.ready_blocks();
- (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false))
+ if !blocks.is_empty() {
+ self.validate_and_queue_blocks(blocks, false);
+ }
}
/// Get prometheus metrics.
@@ -1259,11 +1217,7 @@ where
}
}
- fn validate_and_queue_blocks(
- &mut self,
- mut new_blocks: Vec>,
- gap: bool,
- ) -> ImportBlocksAction {
+ fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec>, gap: bool) {
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
if new_blocks.len() != orig_len {
@@ -1295,7 +1249,7 @@ where
}
self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
- ImportBlocksAction { origin, blocks: new_blocks }
+ self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks })
}
fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) {
@@ -1346,7 +1300,7 @@ where
/// Restart the sync process. This will reset all pending block requests and return an iterator
/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
- fn restart(&mut self) -> impl Iterator- , BadPeer>> + '_ {
+ fn restart(&mut self) {
self.blocks.clear();
if let Err(e) = self.reset_sync_start_point() {
warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}");
@@ -1360,7 +1314,7 @@ where
);
let old_peers = std::mem::take(&mut self.peers);
- old_peers.into_iter().filter_map(move |(peer_id, mut p)| {
+ old_peers.into_iter().for_each(|(peer_id, mut p)| {
// peers that were downloading justifications
// should be kept in that state.
if let PeerSyncState::DownloadingJustification(_) = p.state {
@@ -1374,19 +1328,21 @@ where
);
p.common_number = self.best_queued_number;
self.peers.insert(peer_id, p);
- return None
+ return
}
// handle peers that were in other states.
- match self.new_peer(peer_id, p.best_hash, p.best_number) {
+ let action = match self.new_peer_inner(peer_id, p.best_hash, p.best_number) {
// since the request is not a justification, remove it from pending responses
- Ok(None) => Some(Ok(BlockRequestAction::RemoveStale { peer_id })),
+ Ok(None) => ChainSyncAction::CancelBlockRequest { peer_id },
// update the request if the new one is available
- Ok(Some(request)) => Some(Ok(BlockRequestAction::SendRequest { peer_id, request })),
+ Ok(Some(request)) => ChainSyncAction::SendBlockRequest { peer_id, request },
// this implies that we need to drop pending response from the peer
- Err(e) => Some(Err(e)),
- }
- })
+ Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer),
+ };
+
+ self.actions.push(action);
+ });
}
/// Find a block to start sync from. If we sync with state, that's the latest block we have
@@ -1534,13 +1490,12 @@ where
}
/// Submit blocks received in a response.
- #[must_use]
pub fn on_block_response(
&mut self,
peer_id: PeerId,
request: BlockRequest,
blocks: Vec>,
- ) -> OnBlockResponse {
+ ) {
let block_response = BlockResponse:: { id: request.id, blocks };
let blocks_range = || match (
@@ -1563,41 +1518,21 @@ where
blocks_range(),
);
- if request.fields == BlockAttributes::JUSTIFICATION {
- match self.on_block_justification(peer_id, block_response) {
- Ok(OnBlockJustification::Nothing) => OnBlockResponse::Nothing,
- Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) =>
- OnBlockResponse::ImportJustifications(ImportJustificationsAction {
- peer_id,
- hash,
- number,
- justifications,
- }),
- Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
- }
+ let res = if request.fields == BlockAttributes::JUSTIFICATION {
+ self.on_block_justification(peer_id, block_response)
} else {
- match self.on_block_data(&peer_id, Some(request), block_response) {
- Ok(OnBlockData::Import(action)) => OnBlockResponse::ImportBlocks(action),
- Ok(OnBlockData::Request(peer_id, request)) =>
- OnBlockResponse::SendBlockRequest { peer_id, request },
- Ok(OnBlockData::Continue) => OnBlockResponse::Nothing,
- Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer),
- }
+ self.on_block_data(&peer_id, Some(request), block_response)
+ };
+
+ if let Err(bad_peer) = res {
+ self.actions.push(ChainSyncAction::DropPeer(bad_peer));
}
}
/// Submit a state received in a response.
- #[must_use]
- pub fn on_state_response(
- &mut self,
- peer_id: PeerId,
- response: OpaqueStateResponse,
- ) -> OnStateResponse {
- match self.on_state_data(&peer_id, response) {
- Ok(OnStateData::Import(origin, block)) =>
- OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }),
- Ok(OnStateData::Continue) => OnStateResponse::Nothing,
- Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer),
+ pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) {
+ if let Err(bad_peer) = self.on_state_data(&peer_id, response) {
+ self.actions.push(ChainSyncAction::DropPeer(bad_peer));
}
}
@@ -1833,11 +1768,12 @@ where
None
}
+ #[must_use]
fn on_state_data(
&mut self,
peer_id: &PeerId,
response: OpaqueStateResponse,
- ) -> Result, BadPeer> {
+ ) -> Result<(), BadPeer> {
let response: Box = response.0.downcast().map_err(|_error| {
error!(
target: LOG_TARGET,
@@ -1892,9 +1828,10 @@ where
state: Some(state),
};
debug!(target: LOG_TARGET, "State download is complete. Import is queued");
- Ok(OnStateData::Import(origin, block))
+ self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] });
+ Ok(())
},
- ImportResult::Continue => Ok(OnStateData::Continue),
+ ImportResult::Continue => Ok(()),
ImportResult::BadResponse => {
debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
@@ -1903,12 +1840,7 @@ where
}
/// Submit a warp proof response received.
- #[must_use]
- pub fn on_warp_sync_response(
- &mut self,
- peer_id: &PeerId,
- response: EncodedProof,
- ) -> Result<(), BadPeer> {
+ pub fn on_warp_sync_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
if let Some(peer) = self.peers.get_mut(peer_id) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
peer.state = PeerSyncState::Available;
@@ -1925,14 +1857,16 @@ where
sync.import_warp_proof(response)
} else {
debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}");
- return Err(BadPeer(*peer_id, rep::NOT_REQUESTED))
+ self.actions
+ .push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::NOT_REQUESTED)));
+ return
};
match import_result {
- WarpProofImportResult::Success => Ok(()),
+ WarpProofImportResult::Success => {},
WarpProofImportResult::BadResponse => {
debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}");
- Err(BadPeer(*peer_id, rep::BAD_BLOCK))
+ self.actions.push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::BAD_BLOCK)));
},
}
}
@@ -1942,17 +1876,14 @@ where
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors. If an error is returned, the pending response
/// from the peer must be dropped.
- #[must_use]
pub fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result>, BlockImportError>, B::Hash)>,
- ) -> Box, BadPeer>>> {
+ ) {
trace!(target: LOG_TARGET, "Imported {imported} of {count}");
- let mut output = Vec::new();
-
let mut has_error = false;
for (_, hash) in &results {
self.queue_blocks.remove(hash);
@@ -1993,7 +1924,10 @@ where
if aux.bad_justification {
if let Some(ref peer) = peer_id {
warn!("💔 Sent block with bad justification to import");
- output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION)));
+ self.actions.push(ChainSyncAction::DropPeer(BadPeer(
+ *peer,
+ rep::BAD_JUSTIFICATION,
+ )));
}
}
@@ -2010,7 +1944,7 @@ where
);
self.state_sync = None;
self.mode = SyncMode::Full;
- output.extend(self.restart());
+ self.restart();
}
let warp_sync_complete = self
.warp_sync
@@ -2024,7 +1958,7 @@ where
);
self.warp_sync = None;
self.mode = SyncMode::Full;
- output.extend(self.restart());
+ self.restart();
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
@@ -2042,8 +1976,9 @@ where
target: LOG_TARGET,
"💔 Peer sent block with incomplete header to import",
);
- output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
- output.extend(self.restart());
+ self.actions
+ .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
+ self.restart();
},
Err(BlockImportError::VerificationFailed(peer_id, e)) => {
let extra_message = peer_id
@@ -2055,10 +1990,11 @@ where
);
if let Some(peer) = peer_id {
- output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
+ self.actions
+ .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
}
- output.extend(self.restart());
+ self.restart();
},
Err(BlockImportError::BadBlock(peer_id)) =>
if let Some(peer) = peer_id {
@@ -2066,7 +2002,7 @@ where
target: LOG_TARGET,
"💔 Block {hash:?} received from peer {peer} has been blacklisted",
);
- output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
+ self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
@@ -2078,14 +2014,19 @@ where
warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
self.state_sync = None;
self.warp_sync = None;
- output.extend(self.restart());
+ self.restart();
},
Err(BlockImportError::Cancelled) => {},
};
}
self.allowed_requests.set_all();
- Box::new(output.into_iter())
+ }
+
+ /// Get pending actions to perform.
+ #[must_use]
+ pub fn take_actions(&mut self) -> impl Iterator
- > {
+ std::mem::take(&mut self.actions).into_iter()
}
}
diff --git a/substrate/client/network/sync/src/chain_sync/test.rs b/substrate/client/network/sync/src/chain_sync/test.rs
index 2eefd2ad13ef..15b2a95a07c8 100644
--- a/substrate/client/network/sync/src/chain_sync/test.rs
+++ b/substrate/client/network/sync/src/chain_sync/test.rs
@@ -53,7 +53,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() {
};
// add a new peer with the same best block
- sync.new_peer(peer_id, a1_hash, a1_number).unwrap();
+ sync.new_peer(peer_id, a1_hash, a1_number);
// and request a justification for the block
sync.request_justification(&a1_hash, a1_number);
@@ -74,10 +74,8 @@ fn processes_empty_response_on_justification_request_for_unknown_block() {
// if the peer replies with an empty response (i.e. it doesn't know the block),
// the active request should be cleared.
- assert_eq!(
- sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] }),
- Ok(OnBlockJustification::Nothing),
- );
+ sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] })
+ .unwrap();
// there should be no in-flight requests
assert_eq!(sync.extra_justifications.active_requests().count(), 0);
@@ -119,8 +117,8 @@ fn restart_doesnt_affect_peers_downloading_finality_data() {
let (b1_hash, b1_number) = new_blocks(50);
// add 2 peers at blocks that we don't have locally
- sync.new_peer(peer_id1, Hash::random(), 42).unwrap();
- sync.new_peer(peer_id2, Hash::random(), 10).unwrap();
+ sync.new_peer(peer_id1, Hash::random(), 42);
+ sync.new_peer(peer_id2, Hash::random(), 10);
// we wil send block requests to these peers
// for these blocks we don't know about
@@ -130,7 +128,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() {
.all(|(p, _)| { p == peer_id1 || p == peer_id2 }));
// add a new peer at a known block
- sync.new_peer(peer_id3, b1_hash, b1_number).unwrap();
+ sync.new_peer(peer_id3, b1_hash, b1_number);
// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);
@@ -148,14 +146,19 @@ fn restart_doesnt_affect_peers_downloading_finality_data() {
PeerSyncState::DownloadingJustification(b1_hash),
);
+ // clear old actions
+ let _ = sync.take_actions();
+
// we restart the sync state
- let block_requests = sync.restart();
+ sync.restart();
+ let actions = sync.take_actions().collect::>();
// which should make us send out block requests to the first two peers
- assert!(block_requests.map(|r| r.unwrap()).all(|event| match event {
- BlockRequestAction::SendRequest { peer_id, .. } =>
- peer_id == peer_id1 || peer_id == peer_id2,
- BlockRequestAction::RemoveStale { .. } => false,
+ assert_eq!(actions.len(), 2);
+ assert!(actions.iter().all(|action| match action {
+ ChainSyncAction::SendBlockRequest { peer_id, .. } =>
+ peer_id == &peer_id1 || peer_id == &peer_id2,
+ _ => false,
}));
// peer 3 should be unaffected it was downloading finality data
@@ -166,7 +169,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() {
// Set common block to something that we don't have (e.g. failed import)
sync.peers.get_mut(&peer_id3).unwrap().common_number = 100;
- let _ = sync.restart().count();
+ sync.restart();
assert_eq!(sync.peers.get(&peer_id3).unwrap().common_number, 50);
}
@@ -280,9 +283,8 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() {
let best_block = blocks.last().unwrap().clone();
let max_blocks_to_request = sync.max_blocks_per_request;
// Connect the node we will sync from
- sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number())
- .unwrap();
- sync.new_peer(peer_id2, info.best_hash, 0).unwrap();
+ sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number());
+ sync.new_peer(peer_id2, info.best_hash, 0);
let mut best_block_num = 0;
while best_block_num < MAX_DOWNLOAD_AHEAD {
@@ -300,11 +302,17 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() {
let response = create_block_response(resp_blocks.clone());
- let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+ // Clear old actions to not deal with them
+ let _ = sync.take_actions();
+
+ sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+ assert_eq!(actions.len(), 1);
assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize
- ),);
+ &actions[0],
+ ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize,
+ ));
best_block_num += max_blocks_to_request as u32;
@@ -356,11 +364,14 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() {
assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from);
let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]);
- let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap();
- assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
- ),);
+
+ // Clear old actions to not deal with them
+ let _ = sync.take_actions();
+
+ sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+ assert!(actions.is_empty());
let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from);
@@ -421,25 +432,34 @@ fn can_sync_huge_fork() {
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
// Connect the node we will sync from
- sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number());
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);
+ // Discard old actions we are not interested in
+ let _ = sync.take_actions();
+
// Do the ancestor search
loop {
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
- let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
- request = if let OnBlockData::Request(_peer, request) = on_block_data {
- request
- } else {
+ sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+
+ request = if actions.is_empty() {
// We found the ancenstor
break
+ } else {
+ assert_eq!(actions.len(), 1);
+ match &actions[0] {
+ ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(),
+ action @ _ => panic!("Unexpected action: {action:?}"),
+ }
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
@@ -463,15 +483,18 @@ fn can_sync_huge_fork() {
let response = create_block_response(resp_blocks.clone());
- let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+ sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+ assert_eq!(actions.len(), 1);
assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == sync.max_blocks_per_request as usize
- ),);
+ &actions[0],
+ ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize
+ ));
best_block_num += sync.max_blocks_per_request as u32;
- let _ = sync.on_blocks_processed(
+ sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
resp_blocks
@@ -490,6 +513,9 @@ fn can_sync_huge_fork() {
.collect(),
);
+ // Discard pending actions
+ let _ = sync.take_actions();
+
resp_blocks
.into_iter()
.rev()
@@ -539,25 +565,34 @@ fn syncs_fork_without_duplicate_requests() {
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
// Connect the node we will sync from
- sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number());
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);
+ // Discard pending actions
+ let _ = sync.take_actions();
+
// Do the ancestor search
loop {
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
- let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap();
- request = if let OnBlockData::Request(_peer, request) = on_block_data {
- request
- } else {
+ sync.on_block_data(&peer_id1, Some(request), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+
+ request = if actions.is_empty() {
// We found the ancenstor
break
+ } else {
+ assert_eq!(actions.len(), 1);
+ match &actions[0] {
+ ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(),
+ action @ _ => panic!("Unexpected action: {action:?}"),
+ }
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
@@ -582,11 +617,17 @@ fn syncs_fork_without_duplicate_requests() {
let response = create_block_response(resp_blocks.clone());
- let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
+ // Discard old actions
+ let _ = sync.take_actions();
+
+ sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
+
+ let actions = sync.take_actions().collect::>();
+ assert_eq!(actions.len(), 1);
assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize
- ),);
+ &actions[0],
+ ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize
+ ));
best_block_num += max_blocks_to_request as u32;
@@ -653,8 +694,7 @@ fn removes_target_fork_on_disconnect() {
let peer_id1 = PeerId::random();
let common_block = blocks[1].clone();
// Connect the node we will sync from
- sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number());
// Create a "new" header and announce it
let mut header = blocks[0].header().clone();
@@ -678,8 +718,7 @@ fn can_import_response_with_missing_blocks() {
let peer_id1 = PeerId::random();
let best_block = blocks[3].clone();
- sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number());
sync.peers.get_mut(&peer_id1).unwrap().state = PeerSyncState::Available;
sync.peers.get_mut(&peer_id1).unwrap().common_number = 0;
@@ -730,7 +769,7 @@ fn sync_restart_removes_block_but_not_justification_requests() {
let (b1_hash, b1_number) = new_blocks(50);
// add new peer and request blocks from them
- sync.new_peer(peers[0], Hash::random(), 42).unwrap();
+ sync.new_peer(peers[0], Hash::random(), 42);
// we don't actually perform any requests, just keep track of peers waiting for a response
let mut pending_responses = HashSet::new();
@@ -743,7 +782,7 @@ fn sync_restart_removes_block_but_not_justification_requests() {
}
// add a new peer at a known block
- sync.new_peer(peers[1], b1_hash, b1_number).unwrap();
+ sync.new_peer(peers[1], b1_hash, b1_number);
// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);
@@ -766,24 +805,29 @@ fn sync_restart_removes_block_but_not_justification_requests() {
);
assert_eq!(pending_responses.len(), 2);
+ // discard old actions
+ let _ = sync.take_actions();
+
// restart sync
- let request_events = sync.restart().collect::>();
- for event in request_events.iter() {
- match event.as_ref().unwrap() {
- BlockRequestAction::RemoveStale { peer_id } => {
+ sync.restart();
+ let actions = sync.take_actions().collect::>();
+ for action in actions.iter() {
+ match action {
+ ChainSyncAction::CancelBlockRequest { peer_id } => {
pending_responses.remove(&peer_id);
},
- BlockRequestAction::SendRequest { peer_id, .. } => {
+ ChainSyncAction::SendBlockRequest { peer_id, .. } => {
// we drop obsolete response, but don't register a new request, it's checked in
// the `assert!` below
pending_responses.remove(&peer_id);
},
+ action @ _ => panic!("Unexpected action: {action:?}"),
}
}
- assert!(request_events.iter().any(|event| {
- match event.as_ref().unwrap() {
- BlockRequestAction::RemoveStale { .. } => false,
- BlockRequestAction::SendRequest { peer_id, .. } => peer_id == &peers[0],
+ assert!(actions.iter().any(|action| {
+ match action {
+ ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0],
+ _ => false,
}
}));
@@ -848,11 +892,9 @@ fn request_across_forks() {
// Add the peers, all at the common ancestor 100.
let common_block = blocks.last().unwrap();
let peer_id1 = PeerId::random();
- sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number());
let peer_id2 = PeerId::random();
- sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number())
- .unwrap();
+ sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number());
// Peer 1 announces 107 from fork 1, 100-107 get downloaded.
{
@@ -864,11 +906,17 @@ fn request_across_forks() {
let mut resp_blocks = fork_a_blocks[100_usize..107_usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
- let res = sync.on_block_data(&peer, Some(request), response).unwrap();
+
+ // Drop old actions
+ let _ = sync.take_actions();
+
+ sync.on_block_data(&peer, Some(request), response).unwrap();
+ let actions = sync.take_actions().collect::>();
+ assert_eq!(actions.len(), 1);
assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 7_usize
- ),);
+ &actions[0],
+ ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize
+ ));
assert_eq!(sync.best_queued_number, 107);
assert_eq!(sync.best_queued_hash, block.hash());
assert!(sync.is_known(&block.header.parent_hash()));
@@ -903,11 +951,17 @@ fn request_across_forks() {
// block is announced.
let request = get_block_request(&mut sync, FromBlock::Hash(block.hash()), 1, &peer);
let response = create_block_response(vec![block.clone()]);
- let res = sync.on_block_data(&peer, Some(request), response).unwrap();
+
+ // Drop old actions we are not going to check
+ let _ = sync.take_actions();
+
+ sync.on_block_data(&peer, Some(request), response).unwrap();
+ let actions = sync.take_actions().collect::>();
+ assert_eq!(actions.len(), 1);
assert!(matches!(
- res,
- OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 1_usize
- ),);
+ &actions[0],
+ ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize
+ ));
assert!(sync.is_known(&block.header.parent_hash()));
}
}
diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs
index 560887132e3a..58a9fdc49f20 100644
--- a/substrate/client/network/sync/src/engine.rs
+++ b/substrate/client/network/sync/src/engine.rs
@@ -25,10 +25,7 @@ use crate::{
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
- chain_sync::{
- BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction,
- OnBlockResponse, OnStateResponse,
- },
+ chain_sync::{ChainSync, ChainSyncAction},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
@@ -58,7 +55,7 @@ use schnellru::{ByLength, LruMap};
use tokio::time::{Interval, MissedTickBehavior};
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
-use sc_consensus::import_queue::ImportQueueService;
+use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
use sc_network::{
config::{
FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake,
@@ -74,8 +71,11 @@ use sc_network_common::{
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::{Error as ClientError, HeaderMetadata};
-use sp_consensus::block_validation::BlockAnnounceValidator;
-use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero};
+use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
+use sp_runtime::{
+ traits::{Block as BlockT, Header, NumberFor, Zero},
+ Justifications,
+};
use std::{
collections::{HashMap, HashSet},
@@ -713,11 +713,67 @@ where
self.is_major_syncing
.store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed);
+ // Process actions requested by `ChainSync` during `select!`.
+ self.process_chain_sync_actions();
+
// Send outbound requests on `ChanSync`'s behalf.
self.send_chain_sync_requests();
}
}
+ fn process_chain_sync_actions(&mut self) {
+ self.chain_sync.take_actions().for_each(|action| match action {
+ ChainSyncAction::SendBlockRequest { peer_id, request } => {
+ // Sending block request implies dropping obsolete pending response as we are not
+ // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]).
+ // Furthermore, only one request at a time is allowed to any peer.
+ let removed = self.pending_responses.remove(&peer_id);
+ self.send_block_request(peer_id, request.clone());
+
+ trace!(
+ target: LOG_TARGET,
+ "Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
+ peer_id,
+ request,
+ removed,
+ )
+ },
+ ChainSyncAction::CancelBlockRequest { peer_id } => {
+ let removed = self.pending_responses.remove(&peer_id);
+
+ trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}.");
+ },
+ ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => {
+ self.pending_responses.remove(&peer_id);
+ self.network_service
+ .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
+ self.network_service.report_peer(peer_id, rep);
+
+ trace!(target: LOG_TARGET, "Processed {action:?}.");
+ },
+ ChainSyncAction::ImportBlocks { origin, blocks } => {
+ let count = blocks.len();
+ self.import_blocks(origin, blocks);
+
+ trace!(
+ target: LOG_TARGET,
+ "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
+ );
+ },
+ ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => {
+ self.import_justifications(peer_id, hash, number, justifications);
+
+ trace!(
+ target: LOG_TARGET,
+ "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
+ peer_id,
+ hash,
+ number,
+ )
+ },
+ });
+ }
+
fn perform_periodic_actions(&mut self) {
self.report_metrics();
@@ -766,28 +822,7 @@ where
ToServiceCommand::ClearJustificationRequests =>
self.chain_sync.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
- for result in self.chain_sync.on_blocks_processed(imported, count, results) {
- match result {
- Ok(action) => match action {
- BlockRequestAction::SendRequest { peer_id, request } => {
- // drop obsolete pending response first
- self.pending_responses.remove(&peer_id);
- self.send_block_request(peer_id, request);
- },
- BlockRequestAction::RemoveStale { peer_id } => {
- self.pending_responses.remove(&peer_id);
- },
- },
- Err(BadPeer(peer_id, repu)) => {
- self.pending_responses.remove(&peer_id);
- self.network_service.disconnect_peer(
- peer_id,
- self.block_announce_protocol_name.clone(),
- );
- self.network_service.report_peer(peer_id, repu)
- },
- }
- }
+ self.chain_sync.on_blocks_processed(imported, count, results);
},
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
@@ -940,9 +975,7 @@ where
}
}
- if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) {
- self.import_blocks(import_blocks_action)
- }
+ self.chain_sync.peer_disconnected(&peer_id);
self.pending_responses.remove(&peer_id);
self.event_streams.retain(|stream| {
@@ -1053,17 +1086,7 @@ where
inbound,
};
- let req = if peer.info.roles.is_full() {
- match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) {
- Ok(req) => req,
- Err(BadPeer(id, repu)) => {
- self.network_service.report_peer(id, repu);
- return Err(())
- },
- }
- } else {
- None
- };
+ self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number);
log::debug!(target: LOG_TARGET, "Connected {peer_id}");
@@ -1075,10 +1098,6 @@ where
self.num_in_peers += 1;
}
- if let Some(req) = req {
- self.send_block_request(peer_id, req);
- }
-
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
@@ -1202,22 +1221,7 @@ where
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
- match self.chain_sync.on_block_response(peer_id, req, blocks) {
- OnBlockResponse::SendBlockRequest { peer_id, request } =>
- self.send_block_request(peer_id, request),
- OnBlockResponse::ImportBlocks(import_blocks_action) =>
- self.import_blocks(import_blocks_action),
- OnBlockResponse::ImportJustifications(action) =>
- self.import_justifications(action),
- OnBlockResponse::Nothing => {},
- OnBlockResponse::DisconnectPeer(BadPeer(peer_id, rep)) => {
- self.network_service.disconnect_peer(
- peer_id,
- self.block_announce_protocol_name.clone(),
- );
- self.network_service.report_peer(peer_id, rep);
- },
- }
+ self.chain_sync.on_block_response(peer_id, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
@@ -1262,27 +1266,10 @@ where
},
};
- match self.chain_sync.on_state_response(peer_id, response) {
- OnStateResponse::ImportBlocks(import_blocks_action) =>
- self.import_blocks(import_blocks_action),
- OnStateResponse::DisconnectPeer(BadPeer(peer_id, rep)) => {
- self.network_service.disconnect_peer(
- peer_id,
- self.block_announce_protocol_name.clone(),
- );
- self.network_service.report_peer(peer_id, rep);
- },
- OnStateResponse::Nothing => {},
- }
+ self.chain_sync.on_state_response(peer_id, response);
},
PeerRequest::WarpProof => {
- if let Err(BadPeer(peer_id, rep)) =
- self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp))
- {
- self.network_service
- .disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
- self.network_service.report_peer(peer_id, rep);
- }
+ self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
@@ -1388,7 +1375,7 @@ where
}
/// Import blocks.
- fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction) {
+ fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_blocks_submitted.inc();
}
@@ -1397,13 +1384,17 @@ where
}
/// Import justifications.
- fn import_justifications(&mut self, action: ImportJustificationsAction) {
+ fn import_justifications(
+ &mut self,
+ peer_id: PeerId,
+ hash: B::Hash,
+ number: NumberFor,
+ justifications: Justifications,
+ ) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_justifications_submitted.inc();
}
- let ImportJustificationsAction { peer_id, hash, number, justifications } = action;
-
self.import_queue.import_justifications(peer_id, hash, number, justifications);
}
}
diff --git a/substrate/primitives/runtime/src/lib.rs b/substrate/primitives/runtime/src/lib.rs
index 0e1d4c31fd71..ddf92554c830 100644
--- a/substrate/primitives/runtime/src/lib.rs
+++ b/substrate/primitives/runtime/src/lib.rs
@@ -954,6 +954,32 @@ pub fn print(print: impl traits::Printable) {
print.print();
}
+/// Utility function to declare string literals backed by an array of length N.
+///
+/// The input can be shorter than N, in that case the end of the array is padded with zeros.
+///
+/// [`str_array`] is useful when converting strings that end up in the storage as fixed size arrays
+/// or in const contexts where static data types have strings that could also end up in the storage.
+///
+/// # Example
+///
+/// ```rust
+/// # use sp_runtime::str_array;
+/// const MY_STR: [u8; 6] = str_array("data");
+/// assert_eq!(MY_STR, *b"data\0\0");
+/// ```
+pub const fn str_array(s: &str) -> [u8; N] {
+ debug_assert!(s.len() <= N, "String literal doesn't fit in array");
+ let mut i = 0;
+ let mut arr = [0; N];
+ let s = s.as_bytes();
+ while i < s.len() {
+ arr[i] = s[i];
+ i += 1;
+ }
+ arr
+}
+
/// Describes on what should happen with a storage transaction.
pub enum TransactionOutcome {
/// Commit the transaction.