Skip to content

Commit

Permalink
update reayer with latest subxt changes
Browse files Browse the repository at this point in the history
  • Loading branch information
salman01zp committed Apr 25, 2024
1 parent cf209c3 commit 53be1c6
Show file tree
Hide file tree
Showing 23 changed files with 1,091 additions and 292 deletions.
1,043 changes: 878 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ webb-relayer = { path = "services/webb-relayer" }
webb-proof-generation = { path = "crates/proof-generation" }
webb-circom-proving = { path = "crates/circom-proving" }

tangle-subxt = { git = "https://github.com/webb-tools/tangle", default-features = false, branch = "main" }

thiserror = "^1"
anyhow = "^1"
tracing = { version = "^0.1", features = ["log"] }
Expand All @@ -39,10 +41,10 @@ url = { version = "^2.3", features = ["serde"] }
sled = "^0.34"
tokio = { version = "^1", features = ["full"] }
config = { version = "0.13", default-features = false, features = ["toml", "json"] }
serde_json = { version = "^1", default-features = false }
serde_json = { version = "^1", default-features = false, features = ["raw_value"] }
paw = { version = "^1.0" }
webb = { version = "0.8.4", default-features = false }
subxt-signer = { version = "0.31", features = ["subxt"] }
subxt-signer = { version = "0.34", features = ["subxt"] }
# Used by ethers (but we need it to be vendored with the lib).
native-tls = { version = "^0.2", features = ["vendored"] }
webb-proposals = { git = "https://github.com/webb-tools/webb-rs", features = ["scale"] }
Expand All @@ -64,7 +66,7 @@ serde = { version = "^1", default-features = false, features = ["derive"] }
glob = "^0.3"
serde_path_to_error = "0.1.9"
serde_bytes = "0.11"
jsonrpsee = { version = "0.16.2" }
jsonrpsee = { version = "0.20.3" }

[profile.release]
strip = "symbols"
Expand Down
1 change: 1 addition & 0 deletions crates/event-watcher-traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ native-tls = { workspace = true }
webb-proposals = { workspace = true }
sled = { version = "^0.34" }
subxt-signer = { workspace = true }
tangle-subxt = { workspace = true }

[dev-dependencies]
tracing-test = "0.2"
Expand Down
28 changes: 23 additions & 5 deletions crates/event-watcher-traits/src/substrate/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tangle_subxt::subxt::{
backend::{legacy::LegacyRpcMethods, rpc::RpcClient},
config::Header,
OnlineClient,
};
use tokio::sync::Mutex;
use webb::substrate::subxt::{config::Header, OnlineClient};
use webb_relayer_config::event_watcher::EventsWatcherConfig;
use webb_relayer_context::RelayerContext;
use webb_relayer_utils::{metric, retry};
Expand Down Expand Up @@ -168,10 +172,24 @@ where
return Err(backoff::Error::transient(err));
}
};

let maybe_rpc_client = ctx.get_ws_client::<_>(chain_id).await;
let rpc = match maybe_rpc_client {
Ok(ws_client) => {
let rpc_client = RpcClient::new(ws_client);
LegacyRpcMethods::<RuntimeConfig>::new(rpc_client.clone())
}
Err(err) => {
tracing::error!(
"Failed to connect with substrate rpc client for chain_id: {}, retrying...!",
chain_id
);
return Err(backoff::Error::transient(err));
}
};
let client = Arc::new(client);
let mut instant = std::time::Instant::now();
let step = 1u64;
let rpc = client.rpc();
// get pallet index
let pallet_index = {
let metadata = client.metadata();
Expand All @@ -196,13 +214,13 @@ where
// now we start polling for new events.
// get the current latest block number.
let latest_head = rpc
.finalized_head()
.chain_get_finalized_head()
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;

let maybe_latest_header = rpc
.header(Some(latest_head))
.chain_get_header(Some(latest_head))
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;
Expand Down Expand Up @@ -243,7 +261,7 @@ where
// range [block, dest_block].
// so first we get the hash of the block we want to start from.
let maybe_from = rpc
.block_hash(Some(dest_block.into()))
.chain_get_block_hash(Some(dest_block.into()))
.map_err(Into::into)
.map_err(backoff::Error::transient)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/event-watcher-traits/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::prelude::*;
use std::cmp;
use std::sync::Arc;
use std::time::Duration;
use webb::substrate::subxt::{self, client::OnlineClientT, config::Header};
use tangle_subxt::subxt::{self, client::OnlineClientT, config::Header};
use webb_proposals::{
ResourceId, SubstrateTargetSystem, TargetSystem, TypedChainId,
};
Expand Down
4 changes: 2 additions & 2 deletions crates/event-watcher-traits/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

use std::sync::Arc;
use tangle_subxt::subxt::{self, Config, OnlineClient};
use tangle_subxt::tangle_testnet_runtime::api::system;
use tokio::sync::Mutex;
use webb::substrate::subxt::{self, Config, OnlineClient};
use webb::substrate::tangle_runtime::api::system;
use webb_relayer_config::event_watcher::EventsWatcherConfig;
use webb_relayer_context::RelayerContext;
use webb_relayer_store::sled::SledStore;
Expand Down
1 change: 1 addition & 0 deletions crates/relayer-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
http = "0.2.9"
regex = { version = "1" }
tangle-subxt = { workspace = true }

[features]
default = ["std", "evm", "substrate"]
Expand Down
4 changes: 3 additions & 1 deletion crates/relayer-context/src/ethers_retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ impl RetryPolicy<ProviderError> for WebbHttpRetryPolicy {
if let Some(data) = &json_rpc_error.data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let Some(backoff_seconds) = data.get("rate").and_then(|v| v.get("backoff_seconds")) else {
let Some(backoff_seconds) =
data.get("rate").and_then(|v| v.get("backoff_seconds"))
else {
return Some(DEFAULT_BACKOFF);
};
// infura rate limit error
Expand Down
37 changes: 29 additions & 8 deletions crates/relayer-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! A module for managing the context of the relayer.
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tangle_subxt::subxt::OnlineClient;
use tokio::sync::{broadcast, Mutex};
use webb::substrate::subxt::OnlineClient;
use webb_relayer_tx_queue::evm::EvmTxQueueConfig;
use webb_relayer_tx_queue::substrate::SubstrateTxQueueConfig;
use webb_relayer_types::rpc_client::WebbRpcClient;
Expand All @@ -32,12 +32,12 @@ use webb::evm::ethers::prelude::*;

#[cfg(feature = "substrate")]
use subxt_signer::sr25519::Keypair as Sr25519Pair;
#[cfg(feature = "substrate")]
use tangle_subxt::subxt;
use webb::evm::ethers::middleware::gas_oracle::{
Cache as CachedGasOracle, Etherchain as EtherscanGasOracle,
Median as GasOracleMedian, ProviderOracle,
};
#[cfg(feature = "substrate")]
use webb::substrate::subxt;

use webb_price_oracle_backends::{
CachedPriceBackend, CoinGeckoBackend, DummyPriceBackend, PriceOracleMerger,
Expand Down Expand Up @@ -232,6 +232,21 @@ impl RelayerContext {
let wallet = LocalWallet::from(key).with_chain_id(chain_id);
Ok(wallet)
}

/// Returns a Substrate ws client for the given chain.
#[cfg(feature = "substrate")]
pub async fn get_ws_client<I: Into<types::U256>>(
&self,
chain_id: I,
) -> webb_relayer_utils::Result<Arc<WebbRpcClient>> {
let substrate_ws_clients = self.substrate_providers.lock().await;
substrate_ws_clients
.get(&chain_id.into())
.cloned()
.ok_or_else(|| {
webb_relayer_utils::Error::Generic("Ws client not found")
})
}
/// Sets up and returns a Substrate client for the relayer.
///
/// # Arguments
Expand All @@ -242,6 +257,10 @@ impl RelayerContext {
&self,
chain_id: I,
) -> webb_relayer_utils::Result<subxt::OnlineClient<C>> {
use tangle_subxt::subxt::backend::{
legacy::LegacyBackend, rpc::RpcClient,
};

let chain_id: types::U256 = chain_id.into();
let chain_name = chain_id.to_string();
let node_config =
Expand All @@ -254,15 +273,17 @@ impl RelayerContext {
if let Some(webb_rpc_client) = substrate_providers.get(&chain_id) {
// check if rpc is connected if not create a new connection and cache it
let substrate_client = if webb_rpc_client.0.is_connected() {
subxt::OnlineClient::<C>::from_rpc_client(
webb_rpc_client.clone(),
)
.await?
let legacy_baclend =
LegacyBackend::new(RpcClient::new(webb_rpc_client.clone()));
subxt::OnlineClient::<C>::from_backend(legacy_baclend.into())
.await?
} else {
let url = node_config.ws_endpoint.to_string();
let webb_rpc_client = Arc::new(WebbRpcClient::new(url).await?);
substrate_providers.insert(chain_id, webb_rpc_client.clone());
subxt::OnlineClient::<C>::from_rpc_client(webb_rpc_client)
let legacy_baclend =
LegacyBackend::new(RpcClient::new(webb_rpc_client.clone()));
subxt::OnlineClient::<C>::from_backend(legacy_baclend.into())
.await?
};
Ok(substrate_client)
Expand Down
2 changes: 2 additions & 0 deletions crates/relayer-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ ethereum-types = { workspace = true }
jsonrpsee = { workspace = true, features = ["ws-client"]}
http = "0.2.9"
subxt-signer = { workspace = true }
tangle-subxt = { workspace = true }
serde_json = { workspace = true }
tiny-bip39 = "1.0.0"
75 changes: 58 additions & 17 deletions crates/relayer-types/src/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
use jsonrpsee::async_client::ClientBuilder;
use jsonrpsee::client_transport::ws::WsTransportClientBuilder;
use jsonrpsee::core::client::Client;
use jsonrpsee::core::JsonRawValue;
use webb::substrate::subxt::{self, rpc::RpcClientT};
use jsonrpsee::core::client::{
Client, ClientT, SubscriptionClientT, SubscriptionKind,
};
use jsonrpsee::core::traits::ToRpcParams;
use jsonrpsee::types::SubscriptionId;
use jsonrpsee::ws_client::WsClientBuilder;
use serde_json::value::RawValue;
use tangle_subxt::subxt::backend::rpc::{
RawRpcFuture, RawRpcSubscription, RpcClientT,
};
use tangle_subxt::subxt::error::RpcError;
use tangle_subxt::subxt::ext::futures::{StreamExt, TryStreamExt};

#[derive(Debug)]
pub struct WebbRpcClient(pub Client);

struct Params(Option<Box<RawValue>>);
impl ToRpcParams for Params {
fn to_rpc_params(
self,
) -> Result<Option<Box<RawValue>>, jsonrpsee::core::Error> {
Ok(self.0)
}
}

impl WebbRpcClient {
pub async fn new(
url: impl Into<String>,
) -> webb_relayer_utils::Result<Self> {
let url: http::Uri = url.into().parse().map_err(|_| {
webb_relayer_utils::Error::Generic("RPC url is invalid")
})?;
let (sender, receiver) = WsTransportClientBuilder::default()
.build(url)
let client = WsClientBuilder::default()
.build(url.to_string())
.await
.map_err(|_| {
webb_relayer_utils::Error::Generic("RPC failed to connect")
})?;

let client = ClientBuilder::default()
.max_notifs_per_subscription(4096)
.build_with_tokio(sender, receiver);

Ok(Self(client))
}
}
Expand All @@ -33,17 +45,46 @@ impl RpcClientT for WebbRpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<JsonRawValue>>,
) -> subxt::rpc::RpcFuture<'a, Box<JsonRawValue>> {
self.0.request_raw(method, params)
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
Box::pin(async move {
let res = self
.0
.request(method, Params(params))
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

Ok(res)
})
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<JsonRawValue>>,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> subxt::rpc::RpcFuture<'a, subxt::rpc::RpcSubscription> {
self.0.subscribe_raw(sub, params, unsub)
) -> RawRpcFuture<'a, RawRpcSubscription> {
Box::pin(async move {
let stream = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
&self.0,
sub,
Params(params),
unsub,
)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

let id = match stream.kind() {
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
Some(id.clone().into_owned())
}
_ => None,
};

let stream = stream
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(RawRpcSubscription { stream, id })
})
}
}
2 changes: 2 additions & 0 deletions crates/relayer-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ ark-std = { version = "^0.3.0", default-features = false }
derive_more = { version = "0.99", default-features = false, features = ["display"] }
prometheus = { version = "0.13.0", default-features = false }
hyper = "0.14.24"
tangle-subxt = { workspace = true }
jsonrpsee = { workspace = true }

[features]
default = ["evm-runtime", "substrate-runtime"]
Expand Down
8 changes: 6 additions & 2 deletions crates/relayer-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::sync::Arc;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use multi_provider::MultiProvider;
use webb::substrate::subxt::PolkadotConfig;
use webb::{evm::ethers, substrate::subxt};
use tangle_subxt::subxt;
use tangle_subxt::subxt::PolkadotConfig;
use webb::evm::ethers;
use webb_proposals::ResourceId;

pub mod clickable_link;
Expand Down Expand Up @@ -252,6 +253,9 @@ pub enum Error {
/// Deserialization error.
#[error(transparent)]
DeserializationError(#[from] webb_proposals::DeserializationError),
/// Jsonrpsee Error
#[error(transparent)]
JsonrpseeError(#[from] jsonrpsee::core::Error),
}

/// Vanchor withdraw tx relaying errors.
Expand Down
4 changes: 2 additions & 2 deletions crates/relayer-utils/src/static_tx_payload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::fmt;

use webb::substrate::subxt::{
use tangle_subxt::subxt::{
self,
ext::scale_encode::EncodeAsFields,
tx::{Payload, TxPayload},
Expand Down Expand Up @@ -71,7 +71,7 @@ impl TxPayload for TypeErasedStaticTxPayload {
&self,
_metadata: &subxt::Metadata,
out: &mut Vec<u8>,
) -> Result<(), webb::substrate::subxt::Error> {
) -> Result<(), tangle_subxt::subxt::Error> {
*out = self.tx_data.clone();
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/tx-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ subxt-signer = { workspace = true, optional = true }
# Used by ethers (but we need it to be vendored with the lib).
native-tls = { workspace = true, optional = true }
ethereum-types = { workspace = true }
tangle-subxt = { workspace = true }

rand = { workspace = true, default-features = false, features = ["getrandom"] }

Expand Down
Loading

0 comments on commit 53be1c6

Please sign in to comment.