Skip to content

Commit

Permalink
Relay receiving + processing confirmations (paritytech#351)
Browse files Browse the repository at this point in the history
* relay receiving + processing confirmations

* fmt && clippy

* removed message processing race

* remove more traces

* generic args names
  • Loading branch information
svyatonik authored Sep 17, 2020
1 parent 5d249d4 commit 56c0ed4
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 46 deletions.
1 change: 1 addition & 0 deletions relays/ethereum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod message_lane;
mod message_lane_loop;
mod message_race_delivery;
mod message_race_loop;
mod message_race_receiving;
mod metrics;
mod rpc;
mod rpc_errors;
Expand Down
16 changes: 13 additions & 3 deletions relays/ethereum/src/message_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
//! One-way message lane types. Within single one-way lane we have three 'races' where we try to:
//!
//! 1) relay new messages from source to target node;
//! 2) relay proof-of-receiving from target to source node;
//! 3) relay proof-of-processing from target no source node.
//! 2) relay proof-of-receiving from target to source node.

use crate::utils::HeaderId;

use num_traits::{One, Zero};
use std::fmt::Debug;

/// One-way message lane.
Expand All @@ -32,10 +32,20 @@ pub trait MessageLane {
const TARGET_NAME: &'static str;

/// Message nonce type.
type MessageNonce: Clone + Copy + Debug + Default + From<u32> + Ord + std::ops::Add<Output = Self::MessageNonce>;
type MessageNonce: Clone
+ Copy
+ Debug
+ Default
+ From<u32>
+ Ord
+ std::ops::Add<Output = Self::MessageNonce>
+ One
+ Zero;

/// Messages proof.
type MessagesProof: Clone;
/// Messages receiving proof.
type MessagesReceivingProof: Clone;

/// Number of the source header.
type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq;
Expand Down
103 changes: 91 additions & 12 deletions relays/ethereum/src/message_lane_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use crate::message_race_delivery::run as run_message_delivery_race;
use crate::message_race_receiving::run as run_message_receiving_race;
use crate::utils::{interval, process_future_result, retry_backoff, FailedClient, MaybeConnectionError};

use async_trait::async_trait;
Expand All @@ -52,13 +53,25 @@ pub trait SourceClient<P: MessageLane>: Clone {
&self,
id: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error>;
/// Get nonce of the latest message, which receiving has been confirmed by the target chain.
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error>;

/// Prove messages in inclusive range [begin; end].
async fn prove_messages(
&self,
id: SourceHeaderIdOf<P>,
nonces: RangeInclusive<P::MessageNonce>,
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<P::MessageNonce>, P::MessagesProof), Self::Error>;

/// Submit messages receiving proof.
async fn submit_messages_receiving_proof(
&self,
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error>;
}

/// Target client trait.
Expand All @@ -73,12 +86,18 @@ pub trait TargetClient<P: MessageLane>: Clone {
/// Returns state of the client.
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;

/// Get nonce of latest message, which receival has been confirmed.
/// Get nonce of latest received message.
async fn latest_received_nonce(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error>;

/// Prove messages receiving at given block.
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessagesReceivingProof), Self::Error>;

/// Submit messages proof.
async fn submit_messages_proof(
&self,
Expand Down Expand Up @@ -196,6 +215,19 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
)
.fuse();

let (
(receiving_source_state_sender, receiving_source_state_receiver),
(receiving_target_state_sender, receiving_target_state_receiver),
) = (unbounded(), unbounded());
let receiving_race_loop = run_message_receiving_race(
source_client.clone(),
receiving_source_state_receiver,
target_client.clone(),
receiving_target_state_receiver,
stall_timeout,
)
.fuse();

let exit_signal = exit_signal.fuse();

futures::pin_mut!(
Expand All @@ -206,6 +238,7 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
target_go_offline_future,
target_tick_stream,
delivery_race_loop,
receiving_race_loop,
exit_signal
);

Expand All @@ -224,7 +257,8 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
P::SOURCE_NAME,
new_source_state,
);
let _ = delivery_source_state_sender.unbounded_send(new_source_state);
let _ = delivery_source_state_sender.unbounded_send(new_source_state.clone());
let _ = receiving_source_state_sender.unbounded_send(new_source_state.clone());
},
&mut source_go_offline_future,
|delay| async_std::task::sleep(delay),
Expand All @@ -250,7 +284,8 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
P::TARGET_NAME,
new_target_state,
);
let _ = delivery_target_state_sender.unbounded_send(new_target_state);
let _ = delivery_target_state_sender.unbounded_send(new_target_state.clone());
let _ = receiving_target_state_sender.unbounded_send(new_target_state.clone());
},
&mut target_go_offline_future,
|delay| async_std::task::sleep(delay),
Expand All @@ -270,6 +305,12 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
Err(err) => return Err(err),
}
},
receiving_error = receiving_race_loop => {
match receiving_error {
Ok(_) => unreachable!("only ends with error; qed"),
Err(err) => return Err(err),
}
},

() = exit_signal => {
return Ok(());
Expand Down Expand Up @@ -304,6 +345,7 @@ pub(crate) mod tests {

pub type TestMessageNonce = u64;
pub type TestMessagesProof = RangeInclusive<TestMessageNonce>;
pub type TestMessagesReceivingProof = TestMessageNonce;

pub type TestSourceHeaderNumber = u64;
pub type TestSourceHeaderHash = u64;
Expand Down Expand Up @@ -333,7 +375,9 @@ pub(crate) mod tests {
const TARGET_NAME: &'static str = "TestTarget";

type MessageNonce = TestMessageNonce;

type MessagesProof = TestMessagesProof;
type MessagesReceivingProof = TestMessagesReceivingProof;

type SourceHeaderNumber = TestSourceHeaderNumber;
type SourceHeaderHash = TestSourceHeaderHash;
Expand All @@ -348,6 +392,8 @@ pub(crate) mod tests {
is_source_reconnected: bool,
source_state: SourceClientState<TestMessageLane>,
source_latest_generated_nonce: TestMessageNonce,
source_latest_confirmed_received_nonce: TestMessageNonce,
submitted_messages_receiving_proofs: Vec<TestMessagesReceivingProof>,
is_target_fails: bool,
is_target_reconnected: bool,
target_state: SourceClientState<TestMessageLane>,
Expand Down Expand Up @@ -383,7 +429,6 @@ pub(crate) mod tests {
Ok(data.source_state.clone())
}

/// Get nonce of instance of latest generated message.
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
Expand All @@ -396,6 +441,15 @@ pub(crate) mod tests {
Ok((id, data.source_latest_generated_nonce))
}

async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
) -> Result<(SourceHeaderIdOf<TestMessageLane>, TestMessageNonce), Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
Ok((id, data.source_latest_confirmed_received_nonce))
}

async fn prove_messages(
&self,
id: SourceHeaderIdOf<TestMessageLane>,
Expand All @@ -410,6 +464,18 @@ pub(crate) mod tests {
> {
Ok((id, nonces.clone(), nonces))
}

async fn submit_messages_receiving_proof(
&self,
_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
proof: TestMessagesReceivingProof,
) -> Result<RangeInclusive<TestMessageNonce>, Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof;
Ok(proof..=proof)
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -452,7 +518,13 @@ pub(crate) mod tests {
Ok((id, data.target_latest_received_nonce))
}

/// Submit messages proof.
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessagesReceivingProof), Self::Error> {
Ok((id, self.data.lock().target_latest_received_nonce))
}

async fn submit_messages_proof(
&self,
_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
Expand Down Expand Up @@ -570,22 +642,29 @@ pub(crate) mod tests {
},
Arc::new(|_: &mut TestClientData| {}),
Arc::new(move |data: &mut TestClientData| {
if data.target_state.best_peer.0 < 10 {
// syncing source headers -> target chain (by one)
if data.target_state.best_peer.0 < data.source_state.best_self.0 {
data.target_state.best_peer =
HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1);
}
if data
.submitted_messages_proofs
.last()
.map(|last| *last.end() == 10)
.unwrap_or(false)
{
// syncing source headers -> target chain (all at once)
if data.source_state.best_peer.0 < data.target_state.best_self.0 {
data.source_state.best_peer = data.target_state.best_self;
}
// if target has received all messages => increase target block so that confirmations may be sent
if data.target_latest_received_nonce == 10 {
data.target_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1);
}
// if source has received all messages receiving confirmations => increase source block so that confirmations may be sent
if data.source_latest_confirmed_received_nonce == 10 {
exit_sender.unbounded_send(()).unwrap();
}
}),
exit_receiver.into_future().map(|(_, _)| ()),
);

assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
}
}
Loading

0 comments on commit 56c0ed4

Please sign in to comment.