Skip to content

Commit

Permalink
Substrate tx-queue tracking and http endpoints for processing withdra…
Browse files Browse the repository at this point in the history
…w tx (#551)

Co-authored-by: shekohex <dev+github@shadykhalifa.me>
  • Loading branch information
salman01zp and shekohex authored Jul 4, 2023
1 parent 1611ee5 commit 9449b03
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 650 deletions.
149 changes: 4 additions & 145 deletions crates/relayer-handler-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
#![allow(missing_docs)]

use serde::{Deserialize, Deserializer, Serialize};
use tokio::sync::mpsc;
use webb::evm::ethers::abi::Address;
use webb::evm::ethers::prelude::{ContractError, I256, U128};
use webb::evm::ethers::providers::Middleware;
use webb::evm::ethers::prelude::{I256, U128};

use webb::evm::ethers::types::Bytes;
use webb::evm::ethers::types::{H256, H512, U256};
use webb_relayer_store::queue::QueueItemState;
use webb::evm::ethers::types::{H256, U256};

use webb_relayer_tx_relay_utils::VAnchorRelayTransaction;

/// Representation for IP address response
Expand Down Expand Up @@ -69,111 +68,6 @@ impl<'de> Deserialize<'de> for WebbI128 {
}
}

/// Type of Command to use
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Command {
/// Substrate specific subcommand.
Substrate(SubstrateCommandType),
/// EVM specific subcommand.
Evm(EvmCommandType),
/// Ping?
Ping(),
}

/// Enumerates the supported evm commands for relaying transactions
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum EvmCommandType {
/// Webb Variable Anchors.
VAnchor(EvmVanchorCommand),
}

/// Enumerates the supported substrate commands for relaying transactions
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SubstrateCommandType {
/// Webb Variable Anchors.
VAnchor(SubstrateVAchorCommand),
}

/// Enumerates the command responses
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum CommandResponse {
/// Pong?
Pong(),
/// Network Status
Network(NetworkStatus),
/// Withdrawal Status
Withdraw(WithdrawStatus),
/// An error occurred
Error(String),
/// Tx status
TxStatus {
/// The transaction item key.
#[serde(rename = "itemKey")]
item_key: H512,
status: QueueItemState,
},
}
/// Enumerates the network status response of the relayer
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum NetworkStatus {
/// Relayer is connecting to the network.
Connecting,
/// Relayer is connected to the network.
Connected,
/// Network failure with error message.
Failed {
/// Error message
reason: String,
},
/// Relayer is disconnected from the network.
Disconnected,
/// This contract is not supported by the relayer.
UnsupportedContract,
/// This network (chain) is not supported by the relayer.
UnsupportedChain,
/// Invalid Relayer address in the proof
InvalidRelayerAddress,
}
/// Enumerates the withdraw status response of the relayer
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum WithdrawStatus {
/// The transaction is sent to the network.
Sent,
/// The transaction is submitted to the network.
Submitted {
/// The transaction hash.
#[serde(rename = "txHash")]
tx_hash: H256,
},
/// The transaction is in the block.
Finalized {
/// The transaction hash.
#[serde(rename = "txHash")]
tx_hash: H256,
},
/// Valid transaction.
Valid,
/// Invalid Merkle roots.
InvalidMerkleRoots,
/// Transaction dropped from mempool, send it again.
DroppedFromMemPool,
/// Invalid transaction.
Errored {
/// Error Code.
code: i32,
/// Error Message.
reason: String,
},
}

/// Type alias for mpsc::Sender<CommandResponse>
pub type CommandStream = mpsc::Sender<CommandResponse>;
/// The command type for EVM vanchor transactions
pub type EvmVanchorCommand = VAnchorRelayTransaction<
Address, // Contract address
Expand All @@ -198,38 +92,3 @@ type T = u32; // Substrate assetId
/// The command type for Substrate vanchor txes
pub type SubstrateVAchorCommand =
VAnchorRelayTransaction<Id, P, R, E, I, B, A, T>;

/// A helper function to extract the error code and the reason from EVM errors.
pub fn into_withdraw_error<M: Middleware>(
e: ContractError<M>,
) -> WithdrawStatus {
// a poor man error parser
// WARNING: **don't try this at home**.
let msg = format!("{e}");
// split the error into words, lazily.
let mut words = msg.split_whitespace();
let mut reason = "unknown".to_string();
let mut code = -1;

while let Some(current_word) = words.next() {
if current_word == "(code:" {
code = match words.next() {
Some(val) => {
let mut v = val.to_string();
v.pop(); // remove ","
v.parse().unwrap_or(-1)
}
_ => -1, // unknown code
};
} else if current_word == "message:" {
// next we need to collect all words in between "message:"
// and "data:", that would be the error message.
let msg: Vec<_> =
words.clone().take_while(|v| *v != "data:").collect();
reason = msg.join(" ");
reason.pop(); // remove the "," at the end.
}
}

WithdrawStatus::Errored { reason, code }
}
214 changes: 1 addition & 213 deletions crates/relayer-handlers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,219 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Relayer handlers for HTTP/Socket calls

#![allow(clippy::large_enum_variant)]
#![warn(missing_docs)]
use axum::extract::{Path, State, WebSocketUpgrade};
use ethereum_types::{Address, U256};
use std::error::Error;
use std::sync::Arc;

use futures::prelude::*;

use axum::extract::ws::{Message, WebSocket};
use axum::http::StatusCode;
use axum::response::Response;
use axum::Json;
use axum_client_ip::InsecureClientIp;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use webb_proposals::TypedChainId;

use webb_relayer_context::RelayerContext;
use webb_relayer_handler_utils::{
Command, CommandResponse, CommandStream, IpInformationResponse,
SubstrateCommandType,
};
use webb_relayer_tx_relay::evm::fees::{get_evm_fee_info, EvmFeeInfo};

use webb_relayer_tx_relay::substrate::fees::{
get_substrate_fee_info, SubstrateFeeInfo,
};
use webb_relayer_tx_relay::substrate::vanchor::handle_substrate_vanchor_relay_tx;
use webb_relayer_utils::HandlerError;
//! Relayer handlers for HTTP calls.

/// Module handles relayer API
pub mod routes;

/// Wait for websocket connection upgrade
pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(ctx): State<Arc<RelayerContext>>,
) -> Response {
ws.on_upgrade(move |socket| accept_websocket_connection(socket, ctx))
}

/// Sets up a websocket connection.
///
/// # Arguments
///
/// * `ctx` - RelayContext reference that holds the configuration
/// * `stream` - Websocket stream
async fn accept_websocket_connection(ws: WebSocket, ctx: Arc<RelayerContext>) {
let (mut tx, mut rx) = ws.split();

// Wait for client to send over text (such as relay transaction requests)
while let Some(msg) = rx.next().await {
match msg {
Ok(msg) => {
if let Ok(text) = msg.to_text() {
// Use inspect_err() here once stabilized
let _ =
handle_text(&ctx, text, &mut tx).await.map_err(|e| {
tracing::warn!("Websocket handler error: {e}")
});
}
}
Err(e) => {
tracing::warn!("Websocket error: {e}");
return;
}
}
}
}

/// Sets up a websocket channels for message sending.
///
/// This is primarily used for transaction relaying. The intention is
/// that a user will send formatted relay requests to the relayer using
/// the websocket. The command will be extracted and sent to `handle_cmd`
/// if successfully deserialized.
///
/// Returns `Ok(())` on success
///
/// # Arguments
///
/// * `ctx` - RelayContext reference that holds the configuration
/// * `v` - The text (usually in a JSON form) message to be handled.
/// * `tx` - A mutable Trait implementation of the `warp::ws::Sender` trait
pub async fn handle_text<TX>(
ctx: &RelayerContext,
v: &str,
tx: &mut TX,
) -> webb_relayer_utils::Result<()>
where
TX: Sink<Message> + Unpin,
TX::Error: Error + Send + Sync + 'static,
{
// for every connection, we create a new channel, where we will use to send messages
// over it.
let (my_tx, my_rx) = mpsc::channel(50);
let res_stream = ReceiverStream::new(my_rx);
match serde_json::from_str(v) {
Ok(cmd) => {
if let Err(e) = handle_cmd(ctx.clone(), cmd, my_tx.clone()).await {
tracing::error!("{:?}", e);
let _ = my_tx.send(e).await;
}
// Send back the response, usually a transaction hash
// from processing the transaction relaying command.
res_stream
.fuse()
.map(|v| serde_json::to_string(&v).expect("bad value"))
.inspect(|v| tracing::trace!("Sending: {}", v))
.map(Message::Text)
.map(Result::Ok)
.forward(tx)
.map_err(|_| webb_relayer_utils::Error::FailedToSendResponse)
.await?;
}
Err(e) => {
tracing::warn!("Got invalid payload: {:?}", e);
tracing::debug!("Invalid payload: {:?}", v);
let error = CommandResponse::Error(e.to_string());
let value = serde_json::to_string(&error)?;
tx.send(Message::Text(value))
.map_err(|_| webb_relayer_utils::Error::FailedToSendResponse)
.await?;
}
};
Ok(())
}

/// Handles the socket address response
///
/// Returns a Result with the `IpInformationResponse` on success
///
/// # Arguments
///
/// * `ip` - Extractor for client IP, taking into account x-forwarded-for and similar headers
pub async fn handle_socket_info(
InsecureClientIp(ip): InsecureClientIp,
) -> Json<IpInformationResponse> {
Json(IpInformationResponse { ip: ip.to_string() })
}

/// Handles the command prompts for EVM and Substrate chains
///
/// # Arguments
///
/// * `ctx` - RelayContext reference that holds the configuration
/// * `cmd` - The command to execute
/// * `stream` - The stream to write the response to
pub async fn handle_cmd(
ctx: RelayerContext,
cmd: Command,
stream: CommandStream,
) -> Result<(), CommandResponse> {
if !ctx.config.features.private_tx_relay {
return Err(CommandResponse::Error(
"Private transaction relaying is not enabled.".to_string(),
));
}

match cmd {
Command::Substrate(substrate) => match substrate {
SubstrateCommandType::VAnchor(vanchor) => {
handle_substrate_vanchor_relay_tx(ctx, vanchor, stream).await
}
},
Command::Ping() => {
let _ = stream.send(CommandResponse::Pong()).await;
Ok(())
}
_ => {
unimplemented!()
}
}
}

/// Handler for fee estimation
///
/// # Arguments
///
/// * `chain_id` - ID of the blockchain
/// * `vanchor` - Address of the smart contract
/// * `gas_amount` - How much gas the transaction needs. Don't use U256 here because it
/// gets parsed incorrectly.
pub async fn handle_evm_fee_info(
State(ctx): State<Arc<RelayerContext>>,
Path((chain_id, vanchor, gas_amount)): Path<(u32, Address, u64)>,
) -> Result<Json<EvmFeeInfo>, HandlerError> {
let chain_id = TypedChainId::Evm(chain_id);
let gas_amount = U256::from(gas_amount);
Ok(
get_evm_fee_info(chain_id, vanchor, gas_amount, ctx.as_ref())
.await
.map(Json)?,
)
}

/// Handler for fee estimation
///
/// # Arguments
/// * `chain_id` - ID of the blockchain
/// * `estimated_tx_fees` - Estimated transaction fees
/// * `ctx` - RelayContext reference that holds the configuration
pub async fn handle_substrate_fee_info(
State(ctx): State<Arc<RelayerContext>>,
Path((chain_id, estimated_tx_fees)): Path<(u64, u128)>,
) -> Result<Json<SubstrateFeeInfo>, HandlerError> {
get_substrate_fee_info(chain_id, estimated_tx_fees.into(), ctx.as_ref())
.await
.map(Json)
.map_err(|e| {
HandlerError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})
}
Loading

0 comments on commit 9449b03

Please sign in to comment.