diff --git a/bridges/relays/client-substrate/src/client/mod.rs b/bridges/relays/client-substrate/src/client/mod.rs index a051d15e9ccba..4eeb564899af9 100644 --- a/bridges/relays/client-substrate/src/client/mod.rs +++ b/bridges/relays/client-substrate/src/client/mod.rs @@ -33,7 +33,7 @@ mod rpc_api; mod subscription; pub use client::Client; -pub use subscription::{SharedSubscriptionFactory, Subscription}; +pub use subscription::{SharedSubscriptionFactory, Subscription, UnderlyingSubscription}; /// Type of RPC client with caching support. pub type RpcWithCachingClient = CachingClient>; diff --git a/bridges/relays/client-substrate/src/client/rpc.rs b/bridges/relays/client-substrate/src/client/rpc.rs index ee8dba89d2437..353f08a0fd3fe 100644 --- a/bridges/relays/client-substrate/src/client/rpc.rs +++ b/bridges/relays/client-substrate/src/client/rpc.rs @@ -24,13 +24,14 @@ use crate::{ SubstrateFrameSystemClient, SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient, }, + subscription::{Subscription, Unwrap}, Client, }, error::{Error, Result}, guard::Environment, transaction_stall_timeout, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainRuntimeVersion, ChainWithGrandpa, ChainWithTransactions, ConnectionParams, HashOf, - HeaderIdOf, HeaderOf, NonceOf, SignParam, SignedBlockOf, SimpleRuntimeVersion, Subscription, + HeaderIdOf, HeaderOf, NonceOf, SignParam, SignedBlockOf, SimpleRuntimeVersion, TransactionTracker, UnsignedTransaction, }; @@ -495,26 +496,22 @@ impl Client for RpcClient { ); let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode(); let tx_hash = C::Hasher::hash(&signed_extrinsic); - let subscription = SubstrateAuthorClient::::submit_and_watch_extrinsic( - &*client, - Bytes(signed_extrinsic), - ) - .await - .map_err(|e| { - log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); - e - })?; + let subscription: jsonrpsee::core::client::Subscription<_> = + SubstrateAuthorClient::::submit_and_watch_extrinsic( + &*client, + Bytes(signed_extrinsic), + ) + .await + .map_err(|e| { + log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e); + e + })?; log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash); Ok(TransactionTracker::new( self_clone, stall_timeout, tx_hash, - Subscription::new( - C::NAME.into(), - "transaction events".into(), - Box::new(subscription.map_err(Into::into)), - ) - .await?, + Box::new(Unwrap::new(C::NAME.into(), "transaction events".into(), subscription)), )) }) .await diff --git a/bridges/relays/client-substrate/src/client/subscription.rs b/bridges/relays/client-substrate/src/client/subscription.rs index 3bb623d7dbcbe..14aa7a146b5d1 100644 --- a/bridges/relays/client-substrate/src/client/subscription.rs +++ b/bridges/relays/client-substrate/src/client/subscription.rs @@ -20,15 +20,72 @@ use async_std::{ channel::{bounded, Receiver, Sender}, stream::StreamExt, }; -use futures::{future::FutureExt, Stream}; +use futures::{FutureExt, Stream}; use sp_runtime::DeserializeOwned; -use std::future::Future; +use std::{ + fmt::Debug, + pin::Pin, + task::{Context, Poll}, +}; /// Once channel reaches this capacity, the subscription breaks. const CHANNEL_CAPACITY: usize = 128; /// Underlying subscription type. -pub type UnderlyingSubscription = Box> + Unpin + Send>; +pub type UnderlyingSubscription = Box + Unpin + Send>; + +/// Chainable stream that transforms items of type `Result` to items of type `T`. +/// +/// If it encounters an item of type `Err`, it returns `Poll::Ready(None)` +/// and terminates the underlying stream. +pub struct Unwrap>, T, E> { + chain_name: String, + item_type: String, + subscription: Option, +} + +impl>, T, E> Unwrap { + /// Create a new instance of `Unwrap`. + pub fn new(chain_name: String, item_type: String, subscription: S) -> Self { + Self { chain_name, item_type, subscription: Some(subscription) } + } +} + +impl> + Unpin, T: DeserializeOwned, E: Debug> Stream + for Unwrap +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(match self.subscription.as_mut() { + Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) { + Some(Ok(item)) => Some(item), + Some(Err(e)) => { + self.subscription.take(); + log::debug!( + target: "bridge", + "{} stream of {} has returned error: {:?}. It may need to be restarted", + self.item_type, + self.chain_name, + e, + ); + None + }, + None => { + self.subscription.take(); + log::debug!( + target: "bridge", + "{} stream of {} has returned `None`. It may need to be restarted", + self.item_type, + self.chain_name, + ); + None + }, + }, + None => None, + }) + } +} /// Subscription factory that produces subscriptions, sharing the same background thread. #[derive(Clone)] @@ -41,13 +98,13 @@ impl SharedSubscriptionFactory pub async fn new( chain_name: String, item_type: String, - subscribe: impl Future>> + Send + 'static, + subscription: UnderlyingSubscription>, ) -> Self { let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY); async_std::task::spawn(background_worker( - chain_name, - item_type, - subscribe, + chain_name.clone(), + item_type.clone(), + Box::new(Unwrap::new(chain_name, item_type, subscription)), subscribers_receiver, )); Self { subscribers_sender } @@ -73,16 +130,12 @@ impl Subscription { pub async fn new( chain_name: String, item_type: String, - subscription: UnderlyingSubscription, + subscription: UnderlyingSubscription>, ) -> Result { - SharedSubscriptionFactory::::new( - chain_name, - item_type, - futures::future::ready(Ok(subscription)), - ) - .await - .subscribe() - .await + SharedSubscriptionFactory::::new(chain_name, item_type, subscription) + .await + .subscribe() + .await } /// Return subscription factory for this subscription. @@ -91,7 +144,7 @@ impl Subscription { } /// Consumes subscription and returns future items stream. - pub fn into_stream(self) -> impl futures::Stream { + pub fn into_stream(self) -> impl Stream { futures::stream::unfold(self, |mut this| async { let item = this.items_receiver.next().await.unwrap_or(None); item.map(|i| (i, this)) @@ -112,7 +165,7 @@ impl Subscription { async fn background_worker( chain_name: String, item_type: String, - subscribe: impl Future>> + Send + 'static, + mut subscription: UnderlyingSubscription, mut subscribers_receiver: Receiver>>, ) { fn log_task_exit(chain_name: &str, item_type: &str, reason: &str) { @@ -125,50 +178,6 @@ async fn background_worker( ); } - async fn notify_subscribers( - chain_name: &str, - item_type: &str, - subscribers: &mut Vec>>, - result: Option>, - ) { - let result_to_send = match result { - Some(Ok(item)) => Some(item), - Some(Err(e)) => { - log::debug!( - target: "bridge", - "{} stream of {} has returned error: {:?}. It may need to be restarted", - item_type, - chain_name, - e, - ); - None - }, - None => { - log::debug!( - target: "bridge", - "{} stream of {} has returned `None`. It may need to be restarted", - item_type, - chain_name, - ); - None - }, - }; - - let mut i = 0; - while i < subscribers.len() { - let result_to_send = result_to_send.clone(); - let send_result = subscribers[i].try_send(result_to_send); - match send_result { - Ok(_) => { - i += 1; - }, - Err(_) => { - subscribers.swap_remove(i); - }, - } - } - } - // wait for first subscriber until actually starting subscription let subscriber = match subscribers_receiver.next().await { Some(subscriber) => subscriber, @@ -181,16 +190,6 @@ async fn background_worker( // actually subscribe let mut subscribers = vec![subscriber]; - let mut jsonrpsee_subscription = match subscribe.await { - Ok(jsonrpsee_subscription) => jsonrpsee_subscription, - Err(e) => { - let reason = format!("failed to subscribe: {:?}", e); - notify_subscribers(&chain_name, &item_type, &mut subscribers, Some(Err(e))).await; - - // we cant't do anything without underlying subscription, so let's exit - return log_task_exit(&chain_name, &item_type, &reason) - }, - }; // start listening for new items and receivers loop { @@ -205,10 +204,13 @@ async fn background_worker( }, } }, - item = jsonrpsee_subscription.next().fuse() => { + item = subscription.next().fuse() => { let is_stream_finished = item.is_none(); - let item = item.map(|r| r.map_err(Into::into)); - notify_subscribers(&chain_name, &item_type, &mut subscribers, item).await; + // notify subscribers + subscribers.retain(|subscriber| { + let send_result = subscriber.try_send(item.clone()); + send_result.is_ok() + }); // it means that the underlying client has dropped, so we can't do anything here // and need to stop the task diff --git a/bridges/relays/client-substrate/src/transaction_tracker.rs b/bridges/relays/client-substrate/src/transaction_tracker.rs index 14a97b7a049d7..1dc97faf2a6a8 100644 --- a/bridges/relays/client-substrate/src/transaction_tracker.rs +++ b/bridges/relays/client-substrate/src/transaction_tracker.rs @@ -16,7 +16,9 @@ //! Helper for tracking transaction invalidation events. -use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf}; +use crate::{ + client::UnderlyingSubscription, Chain, Error, HashOf, HeaderIdOf, TransactionStatusOf, +}; use async_trait::async_trait; use futures::{future::Either, Future, FutureExt, Stream, StreamExt}; @@ -64,7 +66,7 @@ pub struct TransactionTracker { environment: E, transaction_hash: HashOf, stall_timeout: Duration, - subscription: Subscription>, + subscription: UnderlyingSubscription>, } impl> TransactionTracker { @@ -73,7 +75,7 @@ impl> TransactionTracker { environment: E, stall_timeout: Duration, transaction_hash: HashOf, - subscription: Subscription>, + subscription: UnderlyingSubscription>, ) -> Self { Self { environment, stall_timeout, transaction_hash, subscription } } @@ -105,7 +107,7 @@ impl> TransactionTracker { let wait_for_invalidation = watch_transaction_status::<_, C, _>( self.environment, self.transaction_hash, - self.subscription.into_stream(), + self.subscription, ); futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation); @@ -328,9 +330,7 @@ mod tests { TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Subscription::new("test".into(), "test".into(), Box::new(receiver)) - .await - .unwrap(), + Box::new(receiver), ); // we can't do `.now_or_never()` on `do_wait()` call, because `Subscription` has its own @@ -338,7 +338,7 @@ mod tests { // relatively small timeout here let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100)); let wait_for_stall_timeout_rest = futures::future::ready(()); - sender.send(Ok(status)).await.unwrap(); + sender.send(status).await.unwrap(); let (ts, is) = tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await; @@ -455,9 +455,7 @@ mod tests { TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Subscription::new("test".into(), "test".into(), Box::new(receiver)) - .await - .unwrap(), + Box::new(receiver), ); let wait_for_stall_timeout = futures::future::ready(()).shared();