From 87e7a4c55015a09049ebe4eb418eb093ce9b44c3 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Thu, 9 May 2024 17:48:02 +0100 Subject: [PATCH 01/10] initial tokio-metrics instrumentation --- rust/Cargo.lock | 13 ++++ rust/Cargo.toml | 4 +- rust/agents/relayer/Cargo.toml | 1 + rust/agents/relayer/src/msg/op_submitter.rs | 65 ++++++++++------- rust/agents/relayer/src/processor.rs | 7 +- rust/agents/relayer/src/relayer.rs | 77 ++++++++++++++++----- 6 files changed, 123 insertions(+), 44 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d8b69f4ea4..e2ac91eb58 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -6968,6 +6968,7 @@ dependencies = [ "strum 0.25.0", "thiserror", "tokio", + "tokio-metrics", "tokio-test", "tracing", "tracing-futures", @@ -9956,6 +9957,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.5", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] @@ -9980,6 +9982,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0322d76292..046585dd6e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -65,6 +65,7 @@ bytes = "1" clap = "4" color-eyre = "0.6" config = "0.13.3" +console-subscriber = "0.2.0" convert_case = "0.6" cosmrs = { version = "0.14", default-features = false, features = [ "cosmwasm", @@ -170,7 +171,8 @@ tendermint-rpc = { version = "0.32.0", features = ["http-client", "tokio"] } thiserror = "1.0" time = "0.3" tiny-keccak = "2.0.2" -tokio = { version = "1", features = ["parking_lot"] } +tokio = { version = "1", features = ["parking_lot", "tracing"] } +tokio-metrics = { version = "0.3.1", default-features = false } tokio-test = "0.4" toml_edit = "0.19.14" tonic = "0.9.2" diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index 59b1c74d6b..78bb74289a 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -32,6 +32,7 @@ serde_json.workspace = true strum.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi-thread"] } +tokio-metrics.workspace = true tracing-futures.workspace = true tracing.workspace = true diff --git a/rust/agents/relayer/src/msg/op_submitter.rs b/rust/agents/relayer/src/msg/op_submitter.rs index 4350baeff2..dc30911490 100644 --- a/rust/agents/relayer/src/msg/op_submitter.rs +++ b/rust/agents/relayer/src/msg/op_submitter.rs @@ -4,10 +4,10 @@ use derive_new::new; use futures::future::join_all; use futures_util::future::try_join_all; use prometheus::{IntCounter, IntGaugeVec}; -use tokio::spawn; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::sleep; +use tokio_metrics::TaskMonitor; use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument}; use tracing::{info, warn}; @@ -82,12 +82,18 @@ pub struct SerialSubmitter { metrics: SerialSubmitterMetrics, /// Max batch size for submitting messages max_batch_size: u32, + /// tokio task monitor + task_monitor: TaskMonitor, } impl SerialSubmitter { pub fn spawn(self) -> Instrumented> { let span = info_span!("SerialSubmitter", destination=%self.domain); - spawn(async move { self.run().await }).instrument(span) + let task_monitor = self.task_monitor.clone(); + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + self.run().await + })) + .instrument(span) } async fn run(self) { @@ -97,6 +103,7 @@ impl SerialSubmitter { rx: rx_prepare, retry_rx, max_batch_size, + task_monitor, } = self; let prepare_queue = OpQueue::new( metrics.submitter_queue_length.clone(), @@ -115,32 +122,40 @@ impl SerialSubmitter { ); let tasks = [ - spawn(receive_task( - domain.clone(), - rx_prepare, - prepare_queue.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + receive_task(domain.clone(), rx_prepare, prepare_queue.clone()), )), - spawn(prepare_task( - domain.clone(), - prepare_queue.clone(), - submit_queue.clone(), - confirm_queue.clone(), - max_batch_size, - metrics.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + prepare_task( + domain.clone(), + prepare_queue.clone(), + submit_queue.clone(), + confirm_queue.clone(), + max_batch_size, + metrics.clone(), + ), )), - spawn(submit_task( - domain.clone(), - submit_queue, - confirm_queue.clone(), - max_batch_size, - metrics.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + submit_task( + domain.clone(), + submit_queue, + confirm_queue.clone(), + max_batch_size, + metrics.clone(), + ), )), - spawn(confirm_task( - domain.clone(), - prepare_queue, - confirm_queue, - max_batch_size, - metrics, + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + confirm_task( + domain.clone(), + prepare_queue, + confirm_queue, + max_batch_size, + metrics, + ), )), ]; diff --git a/rust/agents/relayer/src/processor.rs b/rust/agents/relayer/src/processor.rs index 56dbe3eb8d..c38449cae1 100644 --- a/rust/agents/relayer/src/processor.rs +++ b/rust/agents/relayer/src/processor.rs @@ -5,6 +5,7 @@ use derive_new::new; use eyre::Result; use hyperlane_core::HyperlaneDomain; use tokio::task::JoinHandle; +use tokio_metrics::TaskMonitor; use tracing::{instrument, warn}; #[async_trait] @@ -20,11 +21,15 @@ pub trait ProcessorExt: Send + Debug { #[derive(new)] pub struct Processor { ticker: Box, + task_monitor: TaskMonitor, } impl Processor { pub fn spawn(self) -> JoinHandle<()> { - tokio::spawn(async move { self.main_loop().await }) + let task_monitor = self.task_monitor.clone(); + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + self.main_loop().await + })) } #[instrument(ret, skip(self), level = "info", fields(domain=%self.ticker.domain()))] diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 581620f2f7..be9c2ea567 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::{Debug, Formatter}, sync::Arc, + time::Duration, }; use async_trait::async_trait; @@ -25,6 +26,7 @@ use tokio::{ }, task::JoinHandle, }; +use tokio_metrics::TaskMonitor; use tracing::{info, info_span, instrument::Instrumented, warn, Instrument}; use crate::{ @@ -287,6 +289,19 @@ impl BaseAgent for Relayer { async fn run(self) { let mut tasks = vec![]; + let task_monitor = tokio_metrics::TaskMonitor::new(); + { + let task_monitor = task_monitor.clone(); + tokio::spawn(async move { + for interval in task_monitor.intervals() { + // pretty-print the metric interval + info!(interval=?interval, "Tokio metrics"); + // wait 500ms + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + } + // run server let mpmc_channel = MpmcChannel::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); let custom_routes = relayer_server::routes(mpmc_channel.sender()); @@ -318,6 +333,7 @@ impl BaseAgent for Relayer { .operation_batch_config() .map(|c| c.max_batch_size) .unwrap_or(1), + task_monitor.clone(), ), ); @@ -334,15 +350,25 @@ impl BaseAgent for Relayer { } for origin in &self.origin_chains { - tasks.push(self.run_message_sync(origin).await); - tasks.push(self.run_interchain_gas_payment_sync(origin).await); - tasks.push(self.run_merkle_tree_hook_syncs(origin).await); + tasks.push(self.run_message_sync(origin, task_monitor.clone()).await); + tasks.push( + self.run_interchain_gas_payment_sync(origin, task_monitor.clone()) + .await, + ); + tasks.push( + self.run_merkle_tree_hook_syncs(origin, task_monitor.clone()) + .await, + ); } // each message process attempts to send messages from a chain for origin in &self.origin_chains { - tasks.push(self.run_message_processor(origin, send_channels.clone())); - tasks.push(self.run_merkle_tree_processor(origin)); + tasks.push(self.run_message_processor( + origin, + send_channels.clone(), + task_monitor.clone(), + )); + tasks.push(self.run_merkle_tree_processor(origin, task_monitor.clone())); } if let Err(err) = try_join_all(tasks).await { @@ -355,22 +381,27 @@ impl BaseAgent for Relayer { } impl Relayer { - async fn run_message_sync(&self, origin: &HyperlaneDomain) -> Instrumented> { + async fn run_message_sync( + &self, + origin: &HyperlaneDomain, + task_monitor: TaskMonitor, + ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self.message_syncs.get(origin).unwrap().clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() .sync("dispatched_messages", cursor) .await - }) + })) .instrument(info_span!("MessageSync")) } async fn run_interchain_gas_payment_sync( &self, origin: &HyperlaneDomain, + task_monitor: TaskMonitor, ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self @@ -379,25 +410,31 @@ impl Relayer { .unwrap() .clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await }) - .instrument(info_span!("IgpSync")) + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + contract_sync.clone().sync("gas_payments", cursor).await + })) + .instrument(info_span!("IgpSync")) } async fn run_merkle_tree_hook_syncs( &self, origin: &HyperlaneDomain, + task_monitor: TaskMonitor, ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) - .instrument(info_span!("MerkleTreeHookSync")) + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + contract_sync.clone().sync("merkle_tree_hook", cursor).await + })) + .instrument(info_span!("MerkleTreeHookSync")) } fn run_message_processor( &self, origin: &HyperlaneDomain, send_channels: HashMap>, + task_monitor: TaskMonitor, ) -> Instrumented> { let metrics = MessageProcessorMetrics::new( &self.core.metrics, @@ -431,12 +468,16 @@ impl Relayer { ); let span = info_span!("MessageProcessor", origin=%message_processor.domain()); - let processor = Processor::new(Box::new(message_processor)); + let processor = Processor::new(Box::new(message_processor), task_monitor.clone()); processor.spawn().instrument(span) } - fn run_merkle_tree_processor(&self, origin: &HyperlaneDomain) -> Instrumented> { + fn run_merkle_tree_processor( + &self, + origin: &HyperlaneDomain, + task_monitor: TaskMonitor, + ) -> Instrumented> { let metrics = MerkleTreeProcessorMetrics::new(); let merkle_tree_processor = MerkleTreeProcessor::new( self.dbs.get(origin).unwrap().clone(), @@ -445,7 +486,7 @@ impl Relayer { ); let span = info_span!("MerkleTreeProcessor", origin=%merkle_tree_processor.domain()); - let processor = Processor::new(Box::new(merkle_tree_processor)); + let processor = Processor::new(Box::new(merkle_tree_processor), task_monitor.clone()); processor.spawn().instrument(span) } @@ -457,6 +498,7 @@ impl Relayer { receiver: UnboundedReceiver, retry_receiver_channel: MpmcReceiver, batch_size: u32, + task_monitor: TaskMonitor, ) -> Instrumented> { let serial_submitter = SerialSubmitter::new( destination.clone(), @@ -464,10 +506,11 @@ impl Relayer { retry_receiver_channel, SerialSubmitterMetrics::new(&self.core.metrics, destination), batch_size, + task_monitor.clone(), ); let span = info_span!("SerialSubmitter", destination=%destination); let destination = destination.clone(); - tokio::spawn(async move { + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { // Propagate task panics serial_submitter.spawn().await.unwrap_or_else(|err| { panic!( @@ -475,7 +518,7 @@ impl Relayer { destination, err ) }); - }) + })) .instrument(span) } } From 622cdaa3e2461ebaa9e031d7a838ffffef145654 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Fri, 10 May 2024 15:19:03 +0100 Subject: [PATCH 02/10] feat(relayer): expose tokio console server and logs --- rust/.cargo/config.toml | 2 + rust/Cargo.lock | 95 +++++++++++++++++-- rust/agents/relayer/Cargo.toml | 1 + rust/agents/relayer/src/relayer.rs | 28 +++--- rust/agents/scraper/Cargo.toml | 1 + rust/agents/scraper/src/agent.rs | 1 + rust/agents/validator/Cargo.toml | 1 + rust/agents/validator/src/validator.rs | 1 + rust/hyperlane-base/Cargo.toml | 1 + rust/hyperlane-base/src/agent.rs | 12 ++- rust/hyperlane-base/src/settings/trace/mod.rs | 6 +- 11 files changed, 123 insertions(+), 26 deletions(-) create mode 100644 rust/.cargo/config.toml diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/rust/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/rust/Cargo.lock b/rust/Cargo.lock index e2ac91eb58..dd039b7d99 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1347,6 +1347,43 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost 0.12.4", + "prost-types 0.12.4", + "tonic 0.10.2", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types 0.12.4", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.10.2", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -1451,7 +1488,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c9d2043a9e617b0d602fbc0a0ecd621568edbf3a9774890a6d562389bd8e1c" dependencies = [ "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "tendermint-proto 0.32.2 (registry+https://github.com/rust-lang/crates.io-index)", "tonic 0.9.2", ] @@ -3793,6 +3830,19 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -4116,6 +4166,7 @@ dependencies = [ "bs58 0.5.0", "color-eyre", "config", + "console-subscriber", "convert_case 0.6.0", "derive-new", "derive_builder", @@ -4946,7 +4997,7 @@ dependencies = [ "cosmwasm-std", "osmosis-std-derive", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "schemars", "serde", "serde-cw-value", @@ -6534,16 +6585,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", ] [[package]] name = "prost" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", + "prost-derive 0.12.5", ] [[package]] @@ -6559,6 +6611,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" +dependencies = [ + "anyhow", + "itertools 0.12.0", + "proc-macro2 1.0.76", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -6568,6 +6633,15 @@ dependencies = [ "prost 0.11.9", ] +[[package]] +name = "prost-types" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +dependencies = [ + "prost 0.12.4", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -6944,6 +7018,7 @@ dependencies = [ "async-trait", "axum", "config", + "console-subscriber", "convert_case 0.6.0", "derive-new", "derive_more", @@ -7631,6 +7706,7 @@ version = "0.1.0" dependencies = [ "async-trait", "config", + "console-subscriber", "derive_more", "ethers", "eyre", @@ -9701,7 +9777,7 @@ dependencies = [ "num-traits", "once_cell", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "ripemd", "serde", "serde_bytes", @@ -9740,7 +9816,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "serde", "serde_bytes", "subtle-encoding", @@ -9757,7 +9833,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "serde", "serde_bytes", "subtle-encoding", @@ -10230,7 +10306,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost 0.12.3", + "prost 0.12.4", "rustls 0.21.10", "rustls-native-certs 0.6.3", "rustls-pemfile 1.0.4", @@ -10651,6 +10727,7 @@ dependencies = [ "async-trait", "axum", "config", + "console-subscriber", "derive-new", "derive_more", "ethers", diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index 78bb74289a..5c80952089 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -13,6 +13,7 @@ version.workspace = true async-trait.workspace = true axum.workspace = true config.workspace = true +console-subscriber.workspace = true convert_case.workspace = true derive-new.workspace = true derive_more.workspace = true diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index be9c2ea567..0496e38cac 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -2,7 +2,6 @@ use std::{ collections::{HashMap, HashSet}, fmt::{Debug, Formatter}, sync::Arc, - time::Duration, }; use async_trait::async_trait; @@ -27,7 +26,7 @@ use tokio::{ task::JoinHandle, }; use tokio_metrics::TaskMonitor; -use tracing::{info, info_span, instrument::Instrumented, warn, Instrument}; +use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument}; use crate::{ merkle_tree::builder::MerkleTreeBuilder, @@ -81,6 +80,8 @@ pub struct Relayer { // or move them in `core_metrics`, like the validator metrics agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + /// Tokio console server + pub tokio_console_server: Option, } impl Debug for Relayer { @@ -111,6 +112,7 @@ impl BaseAgent for Relayer { core_metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized, @@ -282,24 +284,24 @@ impl BaseAgent for Relayer { core_metrics, agent_metrics, chain_metrics, + tokio_console_server: Some(tokio_console_server), }) } #[allow(clippy::async_yields_async)] - async fn run(self) { + async fn run(mut self) { let mut tasks = vec![]; let task_monitor = tokio_metrics::TaskMonitor::new(); - { - let task_monitor = task_monitor.clone(); - tokio::spawn(async move { - for interval in task_monitor.intervals() { - // pretty-print the metric interval - info!(interval=?interval, "Tokio metrics"); - // wait 500ms - tokio::time::sleep(Duration::from_millis(500)).await; - } - }); + if let Some(tokio_console_server) = self.tokio_console_server.take() { + let console_server = + tokio::spawn(TaskMonitor::instrument(&task_monitor.clone(), async move { + info!("Starting tokio console server"); + if let Err(e) = tokio_console_server.serve().await { + error!(error=?e, "Tokio console server failed to start"); + } + })); + tasks.push(console_server.instrument(info_span!("Tokio console server"))); } // run server diff --git a/rust/agents/scraper/Cargo.toml b/rust/agents/scraper/Cargo.toml index 587a56b8dd..2348135731 100644 --- a/rust/agents/scraper/Cargo.toml +++ b/rust/agents/scraper/Cargo.toml @@ -12,6 +12,7 @@ version.workspace = true [dependencies] async-trait.workspace = true config.workspace = true +console-subscriber.workspace = true derive_more.workspace = true ethers.workspace = true eyre.workspace = true diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index cc113cacfd..d713432819 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -44,6 +44,7 @@ impl BaseAgent for Scraper { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + _tokio_console_server: console_subscriber::Server, ) -> eyre::Result where Self: Sized, diff --git a/rust/agents/validator/Cargo.toml b/rust/agents/validator/Cargo.toml index 98a5fe6f83..e9e66eb303 100644 --- a/rust/agents/validator/Cargo.toml +++ b/rust/agents/validator/Cargo.toml @@ -13,6 +13,7 @@ version.workspace = true async-trait.workspace = true axum.workspace = true config.workspace = true +console-subscriber.workspace = true derive_more.workspace = true derive-new.workspace = true ethers.workspace = true diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 8d28980098..043ac9249d 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -63,6 +63,7 @@ impl BaseAgent for Validator { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + _tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized, diff --git a/rust/hyperlane-base/Cargo.toml b/rust/hyperlane-base/Cargo.toml index 97d84b6221..9f401b399f 100644 --- a/rust/hyperlane-base/Cargo.toml +++ b/rust/hyperlane-base/Cargo.toml @@ -15,6 +15,7 @@ axum.workspace = true bs58.workspace = true color-eyre = { workspace = true, optional = true } config.workspace = true +console-subscriber.workspace = true convert_case.workspace = true derive_builder.workspace = true derive-new.workspace = true diff --git a/rust/hyperlane-base/src/agent.rs b/rust/hyperlane-base/src/agent.rs index 5f6b504e9a..153526d584 100644 --- a/rust/hyperlane-base/src/agent.rs +++ b/rust/hyperlane-base/src/agent.rs @@ -44,6 +44,7 @@ pub trait BaseAgent: Send + Sync + Debug { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized; @@ -75,10 +76,17 @@ pub async fn agent_main() -> Result<()> { let core_settings: &Settings = settings.as_ref(); let metrics = settings.as_ref().metrics(A::AGENT_NAME)?; - core_settings.tracing.start_tracing(&metrics)?; + let tokio_server = core_settings.tracing.start_tracing(&metrics)?; let agent_metrics = create_agent_metrics(&metrics)?; let chain_metrics = create_chain_metrics(&metrics)?; - let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?; + let agent = A::from_settings( + settings, + metrics.clone(), + agent_metrics, + chain_metrics, + tokio_server, + ) + .await?; // This await will only end if a panic happens. We won't crash, but instead gracefully shut down agent.run().await; diff --git a/rust/hyperlane-base/src/settings/trace/mod.rs b/rust/hyperlane-base/src/settings/trace/mod.rs index b0640f36ee..7d64e93dd7 100644 --- a/rust/hyperlane-base/src/settings/trace/mod.rs +++ b/rust/hyperlane-base/src/settings/trace/mod.rs @@ -60,7 +60,7 @@ pub struct TracingConfig { impl TracingConfig { /// Attempt to instantiate and register a tracing subscriber setup from /// settings. - pub fn start_tracing(&self, metrics: &CoreMetrics) -> Result<()> { + pub fn start_tracing(&self, metrics: &CoreMetrics) -> Result { let mut target_layer = Targets::new().with_default(self.level); if self.level < Level::DependencyTrace { @@ -85,13 +85,15 @@ impl TracingConfig { let fmt_layer: LogOutputLayer<_> = self.fmt.into(); let err_layer = tracing_error::ErrorLayer::default(); + let (tokio_layer, tokio_server) = console_subscriber::ConsoleLayer::new(); let subscriber = tracing_subscriber::Registry::default() + .with(tokio_layer) .with(target_layer) .with(TimeSpanLifetime::new(metrics)) .with(fmt_layer) .with(err_layer); subscriber.try_init()?; - Ok(()) + Ok(tokio_server) } } From 03c6c9f26cb3e0b77fa7500339e9ce16de7f5152 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Fri, 10 May 2024 18:02:46 +0100 Subject: [PATCH 03/10] fix: unit test --- rust/agents/relayer/src/msg/processor.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 3aae0d308c..3e3b5aa61d 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -209,6 +209,7 @@ mod test { }, time::sleep, }; + use tokio_metrics::TaskMonitor; fn dummy_processor_metrics(domain_id: u32) -> MessageProcessorMetrics { MessageProcessorMetrics { @@ -367,7 +368,7 @@ mod test { let (message_processor, mut receive_channel) = dummy_message_processor(origin_domain, destination_domain, db); - let processor = Processor::new(Box::new(message_processor)); + let processor = Processor::new(Box::new(message_processor), TaskMonitor::new()); let process_fut = processor.spawn(); let mut pending_messages = vec![]; let pending_message_accumulator = async { From 2c7fc4c508ca6462930faeca8fc6df4727f8b1d1 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 13 May 2024 11:06:40 +0100 Subject: [PATCH 04/10] chore: get tokio console to work with our docker build --- rust/agents/relayer/.cargo/config.toml | 2 ++ typescript/infra/config/environments/mainnet3/agent.ts | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 rust/agents/relayer/.cargo/config.toml diff --git a/rust/agents/relayer/.cargo/config.toml b/rust/agents/relayer/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/rust/agents/relayer/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/typescript/infra/config/environments/mainnet3/agent.ts b/typescript/infra/config/environments/mainnet3/agent.ts index b65dfbc774..b1170952e5 100644 --- a/typescript/infra/config/environments/mainnet3/agent.ts +++ b/typescript/infra/config/environments/mainnet3/agent.ts @@ -233,7 +233,7 @@ const releaseCandidate: RootAgentConfig = { rpcConsensusType: RpcConsensusType.Fallback, docker: { repo, - tag: '3012392-20240507-130024', + tag: '16e1dd1-20240510-170423', }, // We're temporarily (ab)using the RC relayer as a way to increase // message throughput. From 42a9f2257894bf9bf45ba881c8c22bd222e5c7ea Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 13 May 2024 11:49:46 +0100 Subject: [PATCH 05/10] set build flag directly in dockerfile --- rust/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/Dockerfile b/rust/Dockerfile index f931c0fa78..7a5c882608 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -27,7 +27,7 @@ RUN \ --mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \ --mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \ - cargo build --release --bin validator --bin relayer --bin scraper && \ + RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \ mkdir -p /release && \ cp /usr/src/target/release/validator /release && \ cp /usr/src/target/release/relayer /release && \ From 55b808e1f12ae5526591148467f9c5212219df54 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 13 May 2024 13:17:02 +0100 Subject: [PATCH 06/10] build agent images in debug mode --- rust/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/Dockerfile b/rust/Dockerfile index 7a5c882608..0a9eab1151 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -27,11 +27,11 @@ RUN \ --mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \ --mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \ - RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \ + RUSTFLAGS="--cfg tokio_unstable" cargo build --bin validator --bin relayer --bin scraper && \ mkdir -p /release && \ - cp /usr/src/target/release/validator /release && \ - cp /usr/src/target/release/relayer /release && \ - cp /usr/src/target/release/scraper /release + cp /usr/src/target/debug/validator /release && \ + cp /usr/src/target/debug/relayer /release && \ + cp /usr/src/target/debug/scraper /release ## 2: Copy the binaries to release image FROM ubuntu:22.04 From a70fcbd88ab59ec4e4acdd2bf9e66cc839b79dbb Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Tue, 14 May 2024 18:04:16 +0100 Subject: [PATCH 07/10] feat: forward-backward message processor --- .../relayer/src/merkle_tree/processor.rs | 20 ++- rust/agents/relayer/src/msg/processor.rs | 123 +++++++++++++----- .../src/db/rocks/hyperlane_db.rs | 38 ++++++ 3 files changed, 147 insertions(+), 34 deletions(-) diff --git a/rust/agents/relayer/src/merkle_tree/processor.rs b/rust/agents/relayer/src/merkle_tree/processor.rs index 251e8e7c6b..bddd5382a5 100644 --- a/rust/agents/relayer/src/merkle_tree/processor.rs +++ b/rust/agents/relayer/src/merkle_tree/processor.rs @@ -5,7 +5,6 @@ use std::{ }; use async_trait::async_trait; -use derive_new::new; use eyre::Result; use hyperlane_base::db::HyperlaneRocksDB; use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion}; @@ -18,12 +17,10 @@ use crate::processor::ProcessorExt; use super::builder::MerkleTreeBuilder; /// Finds unprocessed merkle tree insertions and adds them to the prover sync -#[derive(new)] pub struct MerkleTreeProcessor { db: HyperlaneRocksDB, metrics: MerkleTreeProcessorMetrics, prover_sync: Arc>, - #[new(default)] leaf_index: u32, } @@ -65,6 +62,23 @@ impl ProcessorExt for MerkleTreeProcessor { } impl MerkleTreeProcessor { + pub fn new( + db: HyperlaneRocksDB, + prover_sync: Arc>, + metrics: MerkleTreeProcessorMetrics, + ) -> Self { + Self { + db, + prover_sync, + metrics, + leaf_index: db + .retrieve_highest_processed_igp_sequence() + .ok() + .flatten() + .unwrap_or(0), + } + } + fn next_unprocessed_leaf(&mut self) -> Result> { let leaf = if let Some(insertion) = self .db diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 3e3b5aa61d..58f172a1d9 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -6,7 +6,6 @@ use std::{ }; use async_trait::async_trait; -use derive_new::new; use eyre::Result; use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics}; use hyperlane_core::{HyperlaneDomain, HyperlaneMessage}; @@ -20,7 +19,6 @@ use crate::{processor::ProcessorExt, settings::matching_list::MatchingList}; /// Finds unprocessed messages from an origin and submits then through a channel /// for to the appropriate destination. #[allow(clippy::too_many_arguments)] -#[derive(new)] pub struct MessageProcessor { db: HyperlaneRocksDB, whitelist: Arc, @@ -32,8 +30,8 @@ pub struct MessageProcessor { /// Needed context to send a message for each destination chain destination_ctxs: HashMap>, metric_app_contexts: Vec<(MatchingList, String)>, - #[new(default)] - message_nonce: u32, + highest_message_nonce: u32, + lowest_message_nonce: u32, } impl Debug for MessageProcessor { @@ -41,7 +39,7 @@ impl Debug for MessageProcessor { write!( f, "MessageProcessor {{ whitelist: {:?}, blacklist: {:?}, message_nonce: {:?} }}", - self.whitelist, self.blacklist, self.message_nonce + self.whitelist, self.blacklist, self.highest_message_nonce ) } } @@ -68,28 +66,28 @@ impl ProcessorExt for MessageProcessor { // Skip if not whitelisted. if !self.whitelist.msg_matches(&msg, true) { debug!(?msg, whitelist=?self.whitelist, "Message not whitelisted, skipping"); - self.message_nonce += 1; + self.highest_message_nonce += 1; return Ok(()); } // Skip if the message is blacklisted if self.blacklist.msg_matches(&msg, false) { debug!(?msg, blacklist=?self.blacklist, "Message blacklisted, skipping"); - self.message_nonce += 1; + self.highest_message_nonce += 1; return Ok(()); } // Skip if the message is intended for this origin if destination == self.domain().id() { debug!(?msg, "Message destined for self, skipping"); - self.message_nonce += 1; + self.highest_message_nonce += 1; return Ok(()); } // Skip if the message is intended for a destination we do not service if !self.send_channels.contains_key(&destination) { debug!(?msg, "Message destined for unknown domain, skipping"); - self.message_nonce += 1; + self.highest_message_nonce += 1; return Ok(()); } @@ -106,7 +104,7 @@ impl ProcessorExt for MessageProcessor { app_context, ); self.send_channels[&destination].send(Box::new(pending_msg) as QueueOperation)?; - self.message_nonce += 1; + self.highest_message_nonce += 1; } else { tokio::time::sleep(Duration::from_secs(1)).await; } @@ -115,35 +113,98 @@ impl ProcessorExt for MessageProcessor { } impl MessageProcessor { + pub fn new( + db: HyperlaneRocksDB, + whitelist: Arc, + blacklist: Arc, + metrics: MessageProcessorMetrics, + send_channels: HashMap>, + destination_ctxs: HashMap>, + metric_app_contexts: Vec<(MatchingList, String)>, + ) -> Self { + let highest_message_nonce = db + .retrieve_highest_processed_message_nonce() + .ok() + .flatten() + .unwrap_or(0); + Self { + db, + whitelist, + blacklist, + metrics, + send_channels, + destination_ctxs, + metric_app_contexts, + highest_message_nonce, + lowest_message_nonce: highest_message_nonce, + } + } + fn try_get_unprocessed_message(&mut self) -> Result> { loop { // First, see if we can find the message so we can update the gauge. - if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? { - // Update the latest nonce gauges - self.metrics - .max_last_known_message_nonce_gauge - .set(message.nonce as i64); - if let Some(metrics) = self.metrics.get(message.destination) { - metrics.set(message.nonce as i64); - } - - // If this message has already been processed, on to the next one. - if !self - .db - .retrieve_processed_by_nonce(&self.message_nonce)? - .unwrap_or(false) - { - return Ok(Some(message)); - } else { - debug!(nonce=?self.message_nonce, "Message already marked as processed in DB"); - self.message_nonce += 1; - } + if let (Some(message), new_nonce) = + self.try_get_next_unprocessed(self.highest_message_nonce, 1)? + { + self.highest_message_nonce = new_nonce; + return Ok(Some(message)); + } else if let (Some(message), new_nonce) = + self.try_get_next_unprocessed(self.lowest_message_nonce, -1)? + { + self.lowest_message_nonce = new_nonce; + return Ok(Some(message)); } else { - trace!(nonce=?self.message_nonce, "No message found in DB for nonce"); + trace!(nonce=?self.highest_message_nonce, "No message found in DB for nonce"); return Ok(None); } } } + + fn try_get_next_unprocessed( + &mut self, + mut nonce: u32, + increment: i32, + ) -> Result<(Option, u32)> { + if let Some(message) = self.indexed_message_with_nonce(nonce)? { + self.update_max_nonce_gauge(&message); + + // If this message has already been processed, on to the next one. + if !self.processed_message_with_nonce(nonce)? { + return Ok((Some(message), nonce)); + } else { + debug!(nonce=?nonce, "Message already marked as processed in DB"); + nonce = (nonce as i32 + increment) as u32; + } + } else { + trace!(nonce=?nonce, "No message found in DB for nonce"); + } + Ok((None, nonce)) + } + + fn update_max_nonce_gauge(&self, message: &HyperlaneMessage) { + self.metrics + .max_last_known_message_nonce_gauge + .set(message.nonce as i64); + if let Some(metrics) = self.metrics.get(message.destination) { + metrics.set(message.nonce as i64); + } + } + + fn indexed_message_with_nonce(&self, nonce: u32) -> Result> { + if nonce < 0 { + return Ok(None); + } + let msg = self.db.retrieve_message_by_nonce(nonce)?; + Ok(msg) + } + + fn processed_message_with_nonce(&self, nonce: u32) -> Result { + let processed = self + .db + .retrieve_processed_by_nonce(&nonce)? + .unwrap_or(false); + Ok(processed) + } } #[derive(Debug)] diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index da61a26ce5..568ea17a9e 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -23,6 +23,8 @@ const MESSAGE_DISPATCHED_BLOCK_NUMBER: &str = "message_dispatched_block_number_" const MESSAGE: &str = "message_"; const NONCE_PROCESSED: &str = "nonce_processed_"; const GAS_PAYMENT_BY_SEQUENCE: &str = "gas_payment_by_sequence_"; +const HIGHEST_PROCESSED_IGP_SEQUENCE: &str = "highest_processed_igp_sequence_"; +const HIGHEST_PROCESSED_MESSAGE_NONCE: &str = "highest_processed_message_nonce_"; const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_sequence_for_message_id_v2_"; const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v3_"; const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_"; @@ -108,6 +110,38 @@ impl HyperlaneRocksDB { } } + /// Update the nonce of the highest processed message we're aware of + pub fn update_max_seen_message_nonce(&self, nonce: u32) -> DbResult<()> { + let current_max = self.retrieve_highest_processed_message_nonce()?; + if let Some(current_max) = current_max { + if nonce > current_max { + self.store_highest_processed_message_nonce_number(&Default::default(), &nonce)?; + } + } + Ok(()) + } + + /// Retrieve the nonce of the highest processed message we're aware of + pub fn retrieve_highest_processed_message_nonce(&self) -> DbResult> { + self.retrieve_highest_processed_message_nonce_number(&Default::default()) + } + + /// Update the nonce of the highest processed message we're aware of + pub fn update_max_seen_igp_sequence(&self, nonce: u32) -> DbResult<()> { + let current_max = self.retrieve_highest_processed_igp_sequence()?; + if let Some(current_max) = current_max { + if nonce > current_max { + self.store_highest_processed_igp_sequence_number(&Default::default(), &nonce)?; + } + } + Ok(()) + } + + /// Retrieve the nonce of the highest processed message we're aware of + pub fn retrieve_highest_processed_igp_sequence(&self) -> DbResult> { + self.retrieve_highest_processed_igp_sequence_number(&Default::default()) + } + /// If the provided gas payment, identified by its metadata, has not been /// processed, processes the gas payment and records it as processed. /// Returns whether the gas payment was processed for the first time. @@ -479,3 +513,7 @@ make_store_and_retrieve!( u32, u64 ); +// There's no unit struct Encode/Decode impl, so just use `bool`, have visibility be private (by omitting the first argument), and wrap +// with a function that always uses the `Default::default()` key +make_store_and_retrieve!(, highest_processed_igp_sequence_number, HIGHEST_PROCESSED_IGP_SEQUENCE, bool, u32); +make_store_and_retrieve!(, highest_processed_message_nonce_number, HIGHEST_PROCESSED_MESSAGE_NONCE, bool, u32); From d1c1c1319ff027d0af65692b143ff709a6cda8fe Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 20 May 2024 14:40:30 +0100 Subject: [PATCH 08/10] chore: revert dockerfile changes --- rust/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/Dockerfile b/rust/Dockerfile index 0a9eab1151..7a5c882608 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -27,11 +27,11 @@ RUN \ --mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \ --mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \ - RUSTFLAGS="--cfg tokio_unstable" cargo build --bin validator --bin relayer --bin scraper && \ + RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \ mkdir -p /release && \ - cp /usr/src/target/debug/validator /release && \ - cp /usr/src/target/debug/relayer /release && \ - cp /usr/src/target/debug/scraper /release + cp /usr/src/target/release/validator /release && \ + cp /usr/src/target/release/relayer /release && \ + cp /usr/src/target/release/scraper /release ## 2: Copy the binaries to release image FROM ubuntu:22.04 From ca88a8684b5ab0490317a083259eb09d96b0035f Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 20 May 2024 14:41:43 +0100 Subject: [PATCH 09/10] silence runtime logs unless `dependencyTrace` is enabled --- rust/hyperlane-base/src/settings/trace/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/hyperlane-base/src/settings/trace/mod.rs b/rust/hyperlane-base/src/settings/trace/mod.rs index 7d64e93dd7..00d9cb4c50 100644 --- a/rust/hyperlane-base/src/settings/trace/mod.rs +++ b/rust/hyperlane-base/src/settings/trace/mod.rs @@ -70,6 +70,7 @@ impl TracingConfig { .with_target("rusoto_core", Level::Info) .with_target("rustls", Level::Info) .with_target("reqwest", Level::Info) + .with_target("runtime", Level::Debug) .with_target("h2", Level::Info) .with_target("tower", Level::Info) .with_target("tendermint", Level::Info) From 34d16c347d7825e0a3cc20223ed16ff7744ef31c Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 20 May 2024 14:53:02 +0100 Subject: [PATCH 10/10] Revert "feat: forward-backward message processor" This reverts commit a70fcbd88ab59ec4e4acdd2bf9e66cc839b79dbb. --- .../relayer/src/merkle_tree/processor.rs | 20 +-- rust/agents/relayer/src/msg/processor.rs | 123 +++++------------- .../src/db/rocks/hyperlane_db.rs | 38 ------ 3 files changed, 34 insertions(+), 147 deletions(-) diff --git a/rust/agents/relayer/src/merkle_tree/processor.rs b/rust/agents/relayer/src/merkle_tree/processor.rs index bddd5382a5..251e8e7c6b 100644 --- a/rust/agents/relayer/src/merkle_tree/processor.rs +++ b/rust/agents/relayer/src/merkle_tree/processor.rs @@ -5,6 +5,7 @@ use std::{ }; use async_trait::async_trait; +use derive_new::new; use eyre::Result; use hyperlane_base::db::HyperlaneRocksDB; use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion}; @@ -17,10 +18,12 @@ use crate::processor::ProcessorExt; use super::builder::MerkleTreeBuilder; /// Finds unprocessed merkle tree insertions and adds them to the prover sync +#[derive(new)] pub struct MerkleTreeProcessor { db: HyperlaneRocksDB, metrics: MerkleTreeProcessorMetrics, prover_sync: Arc>, + #[new(default)] leaf_index: u32, } @@ -62,23 +65,6 @@ impl ProcessorExt for MerkleTreeProcessor { } impl MerkleTreeProcessor { - pub fn new( - db: HyperlaneRocksDB, - prover_sync: Arc>, - metrics: MerkleTreeProcessorMetrics, - ) -> Self { - Self { - db, - prover_sync, - metrics, - leaf_index: db - .retrieve_highest_processed_igp_sequence() - .ok() - .flatten() - .unwrap_or(0), - } - } - fn next_unprocessed_leaf(&mut self) -> Result> { let leaf = if let Some(insertion) = self .db diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 58f172a1d9..3e3b5aa61d 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -6,6 +6,7 @@ use std::{ }; use async_trait::async_trait; +use derive_new::new; use eyre::Result; use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics}; use hyperlane_core::{HyperlaneDomain, HyperlaneMessage}; @@ -19,6 +20,7 @@ use crate::{processor::ProcessorExt, settings::matching_list::MatchingList}; /// Finds unprocessed messages from an origin and submits then through a channel /// for to the appropriate destination. #[allow(clippy::too_many_arguments)] +#[derive(new)] pub struct MessageProcessor { db: HyperlaneRocksDB, whitelist: Arc, @@ -30,8 +32,8 @@ pub struct MessageProcessor { /// Needed context to send a message for each destination chain destination_ctxs: HashMap>, metric_app_contexts: Vec<(MatchingList, String)>, - highest_message_nonce: u32, - lowest_message_nonce: u32, + #[new(default)] + message_nonce: u32, } impl Debug for MessageProcessor { @@ -39,7 +41,7 @@ impl Debug for MessageProcessor { write!( f, "MessageProcessor {{ whitelist: {:?}, blacklist: {:?}, message_nonce: {:?} }}", - self.whitelist, self.blacklist, self.highest_message_nonce + self.whitelist, self.blacklist, self.message_nonce ) } } @@ -66,28 +68,28 @@ impl ProcessorExt for MessageProcessor { // Skip if not whitelisted. if !self.whitelist.msg_matches(&msg, true) { debug!(?msg, whitelist=?self.whitelist, "Message not whitelisted, skipping"); - self.highest_message_nonce += 1; + self.message_nonce += 1; return Ok(()); } // Skip if the message is blacklisted if self.blacklist.msg_matches(&msg, false) { debug!(?msg, blacklist=?self.blacklist, "Message blacklisted, skipping"); - self.highest_message_nonce += 1; + self.message_nonce += 1; return Ok(()); } // Skip if the message is intended for this origin if destination == self.domain().id() { debug!(?msg, "Message destined for self, skipping"); - self.highest_message_nonce += 1; + self.message_nonce += 1; return Ok(()); } // Skip if the message is intended for a destination we do not service if !self.send_channels.contains_key(&destination) { debug!(?msg, "Message destined for unknown domain, skipping"); - self.highest_message_nonce += 1; + self.message_nonce += 1; return Ok(()); } @@ -104,7 +106,7 @@ impl ProcessorExt for MessageProcessor { app_context, ); self.send_channels[&destination].send(Box::new(pending_msg) as QueueOperation)?; - self.highest_message_nonce += 1; + self.message_nonce += 1; } else { tokio::time::sleep(Duration::from_secs(1)).await; } @@ -113,98 +115,35 @@ impl ProcessorExt for MessageProcessor { } impl MessageProcessor { - pub fn new( - db: HyperlaneRocksDB, - whitelist: Arc, - blacklist: Arc, - metrics: MessageProcessorMetrics, - send_channels: HashMap>, - destination_ctxs: HashMap>, - metric_app_contexts: Vec<(MatchingList, String)>, - ) -> Self { - let highest_message_nonce = db - .retrieve_highest_processed_message_nonce() - .ok() - .flatten() - .unwrap_or(0); - Self { - db, - whitelist, - blacklist, - metrics, - send_channels, - destination_ctxs, - metric_app_contexts, - highest_message_nonce, - lowest_message_nonce: highest_message_nonce, - } - } - fn try_get_unprocessed_message(&mut self) -> Result> { loop { // First, see if we can find the message so we can update the gauge. - if let (Some(message), new_nonce) = - self.try_get_next_unprocessed(self.highest_message_nonce, 1)? - { - self.highest_message_nonce = new_nonce; - return Ok(Some(message)); - } else if let (Some(message), new_nonce) = - self.try_get_next_unprocessed(self.lowest_message_nonce, -1)? - { - self.lowest_message_nonce = new_nonce; - return Ok(Some(message)); - } else { - trace!(nonce=?self.highest_message_nonce, "No message found in DB for nonce"); - return Ok(None); - } - } - } + if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? { + // Update the latest nonce gauges + self.metrics + .max_last_known_message_nonce_gauge + .set(message.nonce as i64); + if let Some(metrics) = self.metrics.get(message.destination) { + metrics.set(message.nonce as i64); + } - fn try_get_next_unprocessed( - &mut self, - mut nonce: u32, - increment: i32, - ) -> Result<(Option, u32)> { - if let Some(message) = self.indexed_message_with_nonce(nonce)? { - self.update_max_nonce_gauge(&message); - - // If this message has already been processed, on to the next one. - if !self.processed_message_with_nonce(nonce)? { - return Ok((Some(message), nonce)); + // If this message has already been processed, on to the next one. + if !self + .db + .retrieve_processed_by_nonce(&self.message_nonce)? + .unwrap_or(false) + { + return Ok(Some(message)); + } else { + debug!(nonce=?self.message_nonce, "Message already marked as processed in DB"); + self.message_nonce += 1; + } } else { - debug!(nonce=?nonce, "Message already marked as processed in DB"); - nonce = (nonce as i32 + increment) as u32; + trace!(nonce=?self.message_nonce, "No message found in DB for nonce"); + return Ok(None); } - } else { - trace!(nonce=?nonce, "No message found in DB for nonce"); - } - Ok((None, nonce)) - } - - fn update_max_nonce_gauge(&self, message: &HyperlaneMessage) { - self.metrics - .max_last_known_message_nonce_gauge - .set(message.nonce as i64); - if let Some(metrics) = self.metrics.get(message.destination) { - metrics.set(message.nonce as i64); } } - - fn indexed_message_with_nonce(&self, nonce: u32) -> Result> { - if nonce < 0 { - return Ok(None); - } - let msg = self.db.retrieve_message_by_nonce(nonce)?; - Ok(msg) - } - - fn processed_message_with_nonce(&self, nonce: u32) -> Result { - let processed = self - .db - .retrieve_processed_by_nonce(&nonce)? - .unwrap_or(false); - Ok(processed) - } } #[derive(Debug)] diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index 568ea17a9e..da61a26ce5 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -23,8 +23,6 @@ const MESSAGE_DISPATCHED_BLOCK_NUMBER: &str = "message_dispatched_block_number_" const MESSAGE: &str = "message_"; const NONCE_PROCESSED: &str = "nonce_processed_"; const GAS_PAYMENT_BY_SEQUENCE: &str = "gas_payment_by_sequence_"; -const HIGHEST_PROCESSED_IGP_SEQUENCE: &str = "highest_processed_igp_sequence_"; -const HIGHEST_PROCESSED_MESSAGE_NONCE: &str = "highest_processed_message_nonce_"; const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_sequence_for_message_id_v2_"; const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v3_"; const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_"; @@ -110,38 +108,6 @@ impl HyperlaneRocksDB { } } - /// Update the nonce of the highest processed message we're aware of - pub fn update_max_seen_message_nonce(&self, nonce: u32) -> DbResult<()> { - let current_max = self.retrieve_highest_processed_message_nonce()?; - if let Some(current_max) = current_max { - if nonce > current_max { - self.store_highest_processed_message_nonce_number(&Default::default(), &nonce)?; - } - } - Ok(()) - } - - /// Retrieve the nonce of the highest processed message we're aware of - pub fn retrieve_highest_processed_message_nonce(&self) -> DbResult> { - self.retrieve_highest_processed_message_nonce_number(&Default::default()) - } - - /// Update the nonce of the highest processed message we're aware of - pub fn update_max_seen_igp_sequence(&self, nonce: u32) -> DbResult<()> { - let current_max = self.retrieve_highest_processed_igp_sequence()?; - if let Some(current_max) = current_max { - if nonce > current_max { - self.store_highest_processed_igp_sequence_number(&Default::default(), &nonce)?; - } - } - Ok(()) - } - - /// Retrieve the nonce of the highest processed message we're aware of - pub fn retrieve_highest_processed_igp_sequence(&self) -> DbResult> { - self.retrieve_highest_processed_igp_sequence_number(&Default::default()) - } - /// If the provided gas payment, identified by its metadata, has not been /// processed, processes the gas payment and records it as processed. /// Returns whether the gas payment was processed for the first time. @@ -513,7 +479,3 @@ make_store_and_retrieve!( u32, u64 ); -// There's no unit struct Encode/Decode impl, so just use `bool`, have visibility be private (by omitting the first argument), and wrap -// with a function that always uses the `Default::default()` key -make_store_and_retrieve!(, highest_processed_igp_sequence_number, HIGHEST_PROCESSED_IGP_SEQUENCE, bool, u32); -make_store_and_retrieve!(, highest_processed_message_nonce_number, HIGHEST_PROCESSED_MESSAGE_NONCE, bool, u32);