Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(evm): collaborative indexing through txid sharing #3833

Merged
merged 20 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions rust/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};

use derive_new::new;
use hyperlane_core::MpmcReceiver;
use hyperlane_core::BroadcastReceiver;
use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::Mutex;
use tracing::{info, instrument};
use tracing::{debug, info, instrument};

use crate::server::MessageRetryRequest;

Expand All @@ -18,7 +18,7 @@ pub type QueueOperation = Box<dyn PendingOperation>;
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: MpmcReceiver<MessageRetryRequest>,
retry_rx: BroadcastReceiver<MessageRetryRequest>,
#[new(default)]
queue: Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>,
}
Expand All @@ -41,7 +41,7 @@ impl OpQueue {
}

/// Pop multiple elements at once from the queue and update metrics
#[instrument(skip(self), ret, fields(queue_label=%self.queue_metrics_label), level = "debug")]
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
#[instrument(skip(self), fields(queue_label=%self.queue_metrics_label), level = "debug")]
pub async fn pop_many(&mut self, limit: usize) -> Vec<QueueOperation> {
self.process_retry_requests().await;
let mut queue = self.queue.lock().await;
Expand All @@ -55,6 +55,15 @@ impl OpQueue {
break;
}
}
// This function is called very often by the op_submitter tasks, so only log when there are operations to pop
// to avoid spamming the logs
if !popped.is_empty() {
debug!(
queue_label = %self.queue_metrics_label,
operations = ?popped,
"Popped OpQueue operations"
);
}
popped
}

Expand Down
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tracing::{info, warn};

use hyperlane_base::CoreMetrics;
use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol,
HyperlaneMessage, MpmcReceiver, TxOutcome,
BatchItem, BroadcastReceiver, ChainCommunicationError, ChainResult, HyperlaneDomain,
HyperlaneDomainProtocol, HyperlaneMessage, TxOutcome,
};

use crate::msg::pending_message::CONFIRM_DELAY;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct SerialSubmitter {
/// Receiver for new messages to submit.
rx: mpsc::UnboundedReceiver<QueueOperation>,
/// Receiver for retry requests.
retry_rx: MpmcReceiver<MessageRetryRequest>,
retry_rx: BroadcastReceiver<MessageRetryRequest>,
/// Metrics for serial submitter.
metrics: SerialSubmitterMetrics,
/// Max batch size for submitting messages
Expand Down
5 changes: 5 additions & 0 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl ProcessorExt for MessageProcessor {
impl MessageProcessor {
fn try_get_unprocessed_message(&mut self) -> Result<Option<HyperlaneMessage>> {
loop {
println!(
"~~~ trying to get unprocessed message for domain and nonce {:?} {:?}",
self.domain(),
self.message_nonce
);
// First, see if we can find the message so we can update the gauge.
if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? {
// Update the latest nonce gauges
Expand Down
64 changes: 53 additions & 11 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use hyperlane_base::{
metrics::{AgentMetrics, MetricsUpdater},
settings::ChainConf,
BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore,
SyncOptions,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, MpmcChannel,
MpmcReceiver, U256,
BroadcastReceiver, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment,
MerkleTreeInsertion, MpmcChannel, H512, U256,
};
use tokio::{
sync::{
Expand Down Expand Up @@ -130,7 +131,9 @@ impl BaseAgent for Relayer {

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
// each of these `contract_syncs` will return a receiver of txid alongside
// `contract_syncs` will also take a hashmap of domain -> recvs as an argument
let message_syncs: HashMap<_, Arc<dyn ContractSyncer<HyperlaneMessage>>> = settings
.contract_syncs::<HyperlaneMessage, _>(
settings.origin_chains.iter(),
&core_metrics,
Expand Down Expand Up @@ -158,6 +161,7 @@ impl BaseAgent for Relayer {
.map(|(k, v)| (k, v as _))
.collect();

// set the receivers for each domain and implement the `fetch_logs_by_tx_hash` for igp and merkle
let merkle_tree_hook_syncs = settings
.contract_syncs::<MerkleTreeInsertion, _>(
settings.origin_chains.iter(),
Expand Down Expand Up @@ -301,6 +305,18 @@ impl BaseAgent for Relayer {
.instrument(info_span!("Relayer server"));
tasks.push(server_task);
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved

let txid_receivers = self
.message_syncs
.iter()
.filter_map(|(k, v)| {
let maybe_rx = v.get_new_receive_tx_channel();
if maybe_rx.is_none() {
warn!("No txid receiver for chain {}", k);
}
maybe_rx.map(|rx| (k.clone(), rx))
})
.collect::<HashMap<_, _>>();

// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for (dest_domain, dest_conf) in &self.destination_chains {
Expand Down Expand Up @@ -335,8 +351,14 @@ impl BaseAgent for Relayer {

for origin in &self.origin_chains {
tasks.push(self.run_message_sync(origin).await);
tasks.push(self.run_interchain_gas_payment_sync(origin).await);
tasks.push(self.run_merkle_tree_hook_syncs(origin).await);
tasks.push(
self.run_interchain_gas_payment_sync(origin, txid_receivers.clone())
.await,
);
tasks.push(
self.run_merkle_tree_hook_syncs(origin, txid_receivers.clone())
.await,
);
}

// each message process attempts to send messages from a chain
Expand All @@ -362,7 +384,7 @@ impl Relayer {
tokio::spawn(async move {
contract_sync
.clone()
.sync("dispatched_messages", cursor)
.sync("dispatched_messages", cursor.into())
.await
})
.instrument(info_span!("MessageSync"))
Expand All @@ -371,6 +393,7 @@ impl Relayer {
async fn run_interchain_gas_payment_sync(
&self,
origin: &HyperlaneDomain,
mut rxs: HashMap<HyperlaneDomain, BroadcastReceiver<H512>>,
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self
Expand All @@ -379,19 +402,38 @@ impl Relayer {
.unwrap()
.clone();
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await })
.instrument(info_span!("IgpSync"))
let origin_chain = origin.clone();
tokio::spawn(async move {
contract_sync
.clone()
.sync(
"gas_payments",
SyncOptions::new(Some(cursor), rxs.remove(&origin_chain)),
)
.await
})
.instrument(info_span!("IgpSync"))
}

async fn run_merkle_tree_hook_syncs(
&self,
origin: &HyperlaneDomain,
mut rxs: HashMap<HyperlaneDomain, BroadcastReceiver<H512>>,
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("MerkleTreeHookSync"))
let origin_chain = origin.clone();
tokio::spawn(async move {
contract_sync
.clone()
.sync(
"merkle_tree_hook",
SyncOptions::new(Some(cursor), rxs.remove(&origin_chain)),
)
.await
})
.instrument(info_span!("MerkleTreeHookSync"))
}

fn run_message_processor(
Expand Down Expand Up @@ -455,7 +497,7 @@ impl Relayer {
&self,
destination: &HyperlaneDomain,
receiver: UnboundedReceiver<QueueOperation>,
retry_receiver_channel: MpmcReceiver<MessageRetryRequest>,
retry_receiver_channel: BroadcastReceiver<MessageRetryRequest>,
batch_size: u32,
) -> Instrumented<JoinHandle<()>> {
let serial_submitter = SerialSubmitter::new(
Expand Down
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ mod tests {
use super::*;
use axum::http::StatusCode;
use ethers::utils::hex::ToHex;
use hyperlane_core::{MpmcChannel, MpmcReceiver};
use hyperlane_core::{BroadcastReceiver, MpmcChannel};
use std::net::SocketAddr;

fn setup_test_server() -> (SocketAddr, MpmcReceiver<MessageRetryRequest>) {
fn setup_test_server() -> (SocketAddr, BroadcastReceiver<MessageRetryRequest>) {
let mpmc_channel = MpmcChannel::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let message_retry_api = MessageRetryApi::new(mpmc_channel.sender());
let (path, retry_router) = message_retry_api.get_route();
Expand Down
6 changes: 3 additions & 3 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Scraper {
.await
.unwrap();
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync("message_dispatch", cursor).await }).instrument(
tokio::spawn(async move { sync.sync("message_dispatch", cursor.into()).await }).instrument(
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
info_span!("ChainContractSync", chain=%domain.name(), event="message_dispatch"),
)
}
Expand All @@ -221,7 +221,7 @@ impl Scraper {

let label = "message_delivery";
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync(label, cursor).await })
tokio::spawn(async move { sync.sync(label, cursor.into()).await })
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
}

Expand All @@ -247,7 +247,7 @@ impl Scraper {

let label = "gas_payment";
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync(label, cursor).await })
tokio::spawn(async move { sync.sync(label, cursor.into()).await })
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
}
}
5 changes: 4 additions & 1 deletion rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ impl Validator {
let contract_sync = self.merkle_tree_hook_sync.clone();
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move {
contract_sync.clone().sync("merkle_tree_hook", cursor).await;
contract_sync
.clone()
.sync("merkle_tree_hook", cursor.into())
.await;
})
.instrument(info_span!("MerkleTreeHookSyncer"))
}
Expand Down
2 changes: 1 addition & 1 deletion rust/chains/hyperlane-cosmos/src/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl CosmosInterchainGasPaymasterIndexer {

#[async_trait]
impl Indexer<InterchainGasPayment> for CosmosInterchainGasPaymasterIndexer {
async fn fetch_logs(
async fn fetch_logs_in_range(
&self,
range: RangeInclusive<u32>,
) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> {
Expand Down
4 changes: 2 additions & 2 deletions rust/chains/hyperlane-cosmos/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl CosmosMailboxIndexer {

#[async_trait]
impl Indexer<HyperlaneMessage> for CosmosMailboxIndexer {
async fn fetch_logs(
async fn fetch_logs_in_range(
&self,
range: RangeInclusive<u32>,
) -> ChainResult<Vec<(Indexed<HyperlaneMessage>, LogMeta)>> {
Expand Down Expand Up @@ -397,7 +397,7 @@ impl Indexer<HyperlaneMessage> for CosmosMailboxIndexer {

#[async_trait]
impl Indexer<H256> for CosmosMailboxIndexer {
async fn fetch_logs(
async fn fetch_logs_in_range(
&self,
range: RangeInclusive<u32>,
) -> ChainResult<Vec<(Indexed<H256>, LogMeta)>> {
Expand Down
2 changes: 1 addition & 1 deletion rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl CosmosMerkleTreeHookIndexer {
#[async_trait]
impl Indexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
/// Fetch list of logs between `range` of blocks
async fn fetch_logs(
async fn fetch_logs_in_range(
&self,
range: RangeInclusive<u32>,
) -> ChainResult<Vec<(Indexed<MerkleTreeInsertion>, LogMeta)>> {
Expand Down
56 changes: 53 additions & 3 deletions rust/chains/hyperlane-ethereum/src/contracts/interchain_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ use std::ops::RangeInclusive;
use std::sync::Arc;

use async_trait::async_trait;
use ethers::abi::RawLog;
use ethers::prelude::Middleware;
use ethers_contract::{ContractError, EthLogDecode, LogMeta as EthersLogMeta};
use ethers_core::types::H256 as EthersH256;
use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneAbi, HyperlaneChain,
HyperlaneContract, HyperlaneDomain, HyperlaneProvider, Indexed, Indexer,
InterchainGasPaymaster, InterchainGasPayment, LogMeta, SequenceAwareIndexer, H160, H256,
InterchainGasPaymaster, InterchainGasPayment, LogMeta, SequenceAwareIndexer, H160, H256, H512,
};
use tracing::instrument;

use crate::interfaces::i_interchain_gas_paymaster::{
IInterchainGasPaymaster as EthereumInterchainGasPaymasterInternal, IINTERCHAINGASPAYMASTER_ABI,
GasPaymentFilter, IInterchainGasPaymaster as EthereumInterchainGasPaymasterInternal,
IINTERCHAINGASPAYMASTER_ABI,
};
use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider};

Expand Down Expand Up @@ -86,7 +90,7 @@ where
{
/// Note: This call may return duplicates depending on the provider used
#[instrument(err, skip(self))]
async fn fetch_logs(
async fn fetch_logs_in_range(
&self,
range: RangeInclusive<u32>,
) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> {
Expand Down Expand Up @@ -124,6 +128,52 @@ where
.as_u32()
.saturating_sub(self.reorg_period))
}

async fn fetch_logs_by_tx_hash(
&self,
tx_hash: H512,
) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> {
let ethers_tx_hash: EthersH256 = tx_hash.into();
let receipt = self
.provider
.get_transaction_receipt(ethers_tx_hash)
.await
.map_err(|err| ContractError::<M>::MiddlewareError(err))?;
println!("~~~ igp receipt: {:?}", receipt);
let Some(receipt) = receipt else {
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
return Ok(vec![]);
};

let logs: Vec<_> = receipt
.logs
.into_iter()
.filter_map(|log| {
let raw_log = RawLog {
topics: log.topics.clone(),
data: log.data.to_vec(),
};
let log_meta: EthersLogMeta = (&log).into();
let gas_payment_filter = GasPaymentFilter::decode_log(&raw_log).ok();
tkporter marked this conversation as resolved.
Show resolved Hide resolved
gas_payment_filter.map(|log| {
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
(
Indexed::new(InterchainGasPayment {
message_id: H256::from(log.message_id),
destination: log.destination_domain,
payment: log.payment.into(),
gas_amount: log.gas_amount.into(),
}),
log_meta.into(),
)
})
})
.collect();
println!(
"~~~ found igp logs with tx id {:?}: {:?}",
tx_hash,
logs.len()
);
Ok(logs)
}
}

#[async_trait]
Expand Down
Loading