diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/rust/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/rust/Cargo.lock b/rust/Cargo.lock index d8b69f4ea4..dd039b7d99 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1347,6 +1347,43 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost 0.12.4", + "prost-types 0.12.4", + "tonic 0.10.2", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types 0.12.4", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.10.2", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -1451,7 +1488,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c9d2043a9e617b0d602fbc0a0ecd621568edbf3a9774890a6d562389bd8e1c" dependencies = [ "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "tendermint-proto 0.32.2 (registry+https://github.com/rust-lang/crates.io-index)", "tonic 0.9.2", ] @@ -3793,6 +3830,19 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -4116,6 +4166,7 @@ dependencies = [ "bs58 0.5.0", "color-eyre", "config", + "console-subscriber", "convert_case 0.6.0", "derive-new", "derive_builder", @@ -4946,7 +4997,7 @@ dependencies = [ "cosmwasm-std", "osmosis-std-derive", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "schemars", "serde", "serde-cw-value", @@ -6534,16 +6585,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", ] [[package]] name = "prost" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", + "prost-derive 0.12.5", ] [[package]] @@ -6559,6 +6611,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" +dependencies = [ + "anyhow", + "itertools 0.12.0", + "proc-macro2 1.0.76", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "prost-types" version = "0.11.9" @@ -6568,6 +6633,15 @@ dependencies = [ "prost 0.11.9", ] +[[package]] +name = "prost-types" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +dependencies = [ + "prost 0.12.4", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -6944,6 +7018,7 @@ dependencies = [ "async-trait", "axum", "config", + "console-subscriber", "convert_case 0.6.0", "derive-new", "derive_more", @@ -6968,6 +7043,7 @@ dependencies = [ "strum 0.25.0", "thiserror", "tokio", + "tokio-metrics", "tokio-test", "tracing", "tracing-futures", @@ -7630,6 +7706,7 @@ version = "0.1.0" dependencies = [ "async-trait", "config", + "console-subscriber", "derive_more", "ethers", "eyre", @@ -9700,7 +9777,7 @@ dependencies = [ "num-traits", "once_cell", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "ripemd", "serde", "serde_bytes", @@ -9739,7 +9816,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "serde", "serde_bytes", "subtle-encoding", @@ -9756,7 +9833,7 @@ dependencies = [ "num-derive 0.3.3", "num-traits", "prost 0.11.9", - "prost-types", + "prost-types 0.11.9", "serde", "serde_bytes", "subtle-encoding", @@ -9956,6 +10033,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.5", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] @@ -9980,6 +10058,17 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -10217,7 +10306,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost 0.12.3", + "prost 0.12.4", "rustls 0.21.10", "rustls-native-certs 0.6.3", "rustls-pemfile 1.0.4", @@ -10638,6 +10727,7 @@ dependencies = [ "async-trait", "axum", "config", + "console-subscriber", "derive-new", "derive_more", "ethers", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index ba33c80eb2..5909e10bc1 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -65,6 +65,7 @@ bytes = "1" clap = "4" color-eyre = "0.6" config = "0.13.3" +console-subscriber = "0.2.0" convert_case = "0.6" cosmrs = { version = "0.14", default-features = false, features = [ "cosmwasm", @@ -170,7 +171,8 @@ tendermint-rpc = { version = "0.32.0", features = ["http-client", "tokio"] } thiserror = "1.0" time = "0.3" tiny-keccak = "2.0.2" -tokio = { version = "1", features = ["parking_lot"] } +tokio = { version = "1", features = ["parking_lot", "tracing"] } +tokio-metrics = { version = "0.3.1", default-features = false } tokio-test = "0.4" toml_edit = "0.19.14" tonic = "0.9.2" diff --git a/rust/Dockerfile b/rust/Dockerfile index f931c0fa78..7a5c882608 100644 --- a/rust/Dockerfile +++ b/rust/Dockerfile @@ -27,7 +27,7 @@ RUN \ --mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \ --mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \ - cargo build --release --bin validator --bin relayer --bin scraper && \ + RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \ mkdir -p /release && \ cp /usr/src/target/release/validator /release && \ cp /usr/src/target/release/relayer /release && \ diff --git a/rust/agents/relayer/.cargo/config.toml b/rust/agents/relayer/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/rust/agents/relayer/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index 59b1c74d6b..5c80952089 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -13,6 +13,7 @@ version.workspace = true async-trait.workspace = true axum.workspace = true config.workspace = true +console-subscriber.workspace = true convert_case.workspace = true derive-new.workspace = true derive_more.workspace = true @@ -32,6 +33,7 @@ serde_json.workspace = true strum.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi-thread"] } +tokio-metrics.workspace = true tracing-futures.workspace = true tracing.workspace = true diff --git a/rust/agents/relayer/src/msg/op_submitter.rs b/rust/agents/relayer/src/msg/op_submitter.rs index 4350baeff2..dc30911490 100644 --- a/rust/agents/relayer/src/msg/op_submitter.rs +++ b/rust/agents/relayer/src/msg/op_submitter.rs @@ -4,10 +4,10 @@ use derive_new::new; use futures::future::join_all; use futures_util::future::try_join_all; use prometheus::{IntCounter, IntGaugeVec}; -use tokio::spawn; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::sleep; +use tokio_metrics::TaskMonitor; use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument}; use tracing::{info, warn}; @@ -82,12 +82,18 @@ pub struct SerialSubmitter { metrics: SerialSubmitterMetrics, /// Max batch size for submitting messages max_batch_size: u32, + /// tokio task monitor + task_monitor: TaskMonitor, } impl SerialSubmitter { pub fn spawn(self) -> Instrumented> { let span = info_span!("SerialSubmitter", destination=%self.domain); - spawn(async move { self.run().await }).instrument(span) + let task_monitor = self.task_monitor.clone(); + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + self.run().await + })) + .instrument(span) } async fn run(self) { @@ -97,6 +103,7 @@ impl SerialSubmitter { rx: rx_prepare, retry_rx, max_batch_size, + task_monitor, } = self; let prepare_queue = OpQueue::new( metrics.submitter_queue_length.clone(), @@ -115,32 +122,40 @@ impl SerialSubmitter { ); let tasks = [ - spawn(receive_task( - domain.clone(), - rx_prepare, - prepare_queue.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + receive_task(domain.clone(), rx_prepare, prepare_queue.clone()), )), - spawn(prepare_task( - domain.clone(), - prepare_queue.clone(), - submit_queue.clone(), - confirm_queue.clone(), - max_batch_size, - metrics.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + prepare_task( + domain.clone(), + prepare_queue.clone(), + submit_queue.clone(), + confirm_queue.clone(), + max_batch_size, + metrics.clone(), + ), )), - spawn(submit_task( - domain.clone(), - submit_queue, - confirm_queue.clone(), - max_batch_size, - metrics.clone(), + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + submit_task( + domain.clone(), + submit_queue, + confirm_queue.clone(), + max_batch_size, + metrics.clone(), + ), )), - spawn(confirm_task( - domain.clone(), - prepare_queue, - confirm_queue, - max_batch_size, - metrics, + tokio::spawn(TaskMonitor::instrument( + &task_monitor, + confirm_task( + domain.clone(), + prepare_queue, + confirm_queue, + max_batch_size, + metrics, + ), )), ]; diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 3aae0d308c..3e3b5aa61d 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -209,6 +209,7 @@ mod test { }, time::sleep, }; + use tokio_metrics::TaskMonitor; fn dummy_processor_metrics(domain_id: u32) -> MessageProcessorMetrics { MessageProcessorMetrics { @@ -367,7 +368,7 @@ mod test { let (message_processor, mut receive_channel) = dummy_message_processor(origin_domain, destination_domain, db); - let processor = Processor::new(Box::new(message_processor)); + let processor = Processor::new(Box::new(message_processor), TaskMonitor::new()); let process_fut = processor.spawn(); let mut pending_messages = vec![]; let pending_message_accumulator = async { diff --git a/rust/agents/relayer/src/processor.rs b/rust/agents/relayer/src/processor.rs index 56dbe3eb8d..c38449cae1 100644 --- a/rust/agents/relayer/src/processor.rs +++ b/rust/agents/relayer/src/processor.rs @@ -5,6 +5,7 @@ use derive_new::new; use eyre::Result; use hyperlane_core::HyperlaneDomain; use tokio::task::JoinHandle; +use tokio_metrics::TaskMonitor; use tracing::{instrument, warn}; #[async_trait] @@ -20,11 +21,15 @@ pub trait ProcessorExt: Send + Debug { #[derive(new)] pub struct Processor { ticker: Box, + task_monitor: TaskMonitor, } impl Processor { pub fn spawn(self) -> JoinHandle<()> { - tokio::spawn(async move { self.main_loop().await }) + let task_monitor = self.task_monitor.clone(); + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + self.main_loop().await + })) } #[instrument(ret, skip(self), level = "info", fields(domain=%self.ticker.domain()))] diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 581620f2f7..0496e38cac 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -25,7 +25,8 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::{info, info_span, instrument::Instrumented, warn, Instrument}; +use tokio_metrics::TaskMonitor; +use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument}; use crate::{ merkle_tree::builder::MerkleTreeBuilder, @@ -79,6 +80,8 @@ pub struct Relayer { // or move them in `core_metrics`, like the validator metrics agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + /// Tokio console server + pub tokio_console_server: Option, } impl Debug for Relayer { @@ -109,6 +112,7 @@ impl BaseAgent for Relayer { core_metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized, @@ -280,13 +284,26 @@ impl BaseAgent for Relayer { core_metrics, agent_metrics, chain_metrics, + tokio_console_server: Some(tokio_console_server), }) } #[allow(clippy::async_yields_async)] - async fn run(self) { + async fn run(mut self) { let mut tasks = vec![]; + let task_monitor = tokio_metrics::TaskMonitor::new(); + if let Some(tokio_console_server) = self.tokio_console_server.take() { + let console_server = + tokio::spawn(TaskMonitor::instrument(&task_monitor.clone(), async move { + info!("Starting tokio console server"); + if let Err(e) = tokio_console_server.serve().await { + error!(error=?e, "Tokio console server failed to start"); + } + })); + tasks.push(console_server.instrument(info_span!("Tokio console server"))); + } + // run server let mpmc_channel = MpmcChannel::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); let custom_routes = relayer_server::routes(mpmc_channel.sender()); @@ -318,6 +335,7 @@ impl BaseAgent for Relayer { .operation_batch_config() .map(|c| c.max_batch_size) .unwrap_or(1), + task_monitor.clone(), ), ); @@ -334,15 +352,25 @@ impl BaseAgent for Relayer { } for origin in &self.origin_chains { - tasks.push(self.run_message_sync(origin).await); - tasks.push(self.run_interchain_gas_payment_sync(origin).await); - tasks.push(self.run_merkle_tree_hook_syncs(origin).await); + tasks.push(self.run_message_sync(origin, task_monitor.clone()).await); + tasks.push( + self.run_interchain_gas_payment_sync(origin, task_monitor.clone()) + .await, + ); + tasks.push( + self.run_merkle_tree_hook_syncs(origin, task_monitor.clone()) + .await, + ); } // each message process attempts to send messages from a chain for origin in &self.origin_chains { - tasks.push(self.run_message_processor(origin, send_channels.clone())); - tasks.push(self.run_merkle_tree_processor(origin)); + tasks.push(self.run_message_processor( + origin, + send_channels.clone(), + task_monitor.clone(), + )); + tasks.push(self.run_merkle_tree_processor(origin, task_monitor.clone())); } if let Err(err) = try_join_all(tasks).await { @@ -355,22 +383,27 @@ impl BaseAgent for Relayer { } impl Relayer { - async fn run_message_sync(&self, origin: &HyperlaneDomain) -> Instrumented> { + async fn run_message_sync( + &self, + origin: &HyperlaneDomain, + task_monitor: TaskMonitor, + ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self.message_syncs.get(origin).unwrap().clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() .sync("dispatched_messages", cursor) .await - }) + })) .instrument(info_span!("MessageSync")) } async fn run_interchain_gas_payment_sync( &self, origin: &HyperlaneDomain, + task_monitor: TaskMonitor, ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self @@ -379,25 +412,31 @@ impl Relayer { .unwrap() .clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await }) - .instrument(info_span!("IgpSync")) + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + contract_sync.clone().sync("gas_payments", cursor).await + })) + .instrument(info_span!("IgpSync")) } async fn run_merkle_tree_hook_syncs( &self, origin: &HyperlaneDomain, + task_monitor: TaskMonitor, ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); let cursor = contract_sync.cursor(index_settings).await; - tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await }) - .instrument(info_span!("MerkleTreeHookSync")) + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { + contract_sync.clone().sync("merkle_tree_hook", cursor).await + })) + .instrument(info_span!("MerkleTreeHookSync")) } fn run_message_processor( &self, origin: &HyperlaneDomain, send_channels: HashMap>, + task_monitor: TaskMonitor, ) -> Instrumented> { let metrics = MessageProcessorMetrics::new( &self.core.metrics, @@ -431,12 +470,16 @@ impl Relayer { ); let span = info_span!("MessageProcessor", origin=%message_processor.domain()); - let processor = Processor::new(Box::new(message_processor)); + let processor = Processor::new(Box::new(message_processor), task_monitor.clone()); processor.spawn().instrument(span) } - fn run_merkle_tree_processor(&self, origin: &HyperlaneDomain) -> Instrumented> { + fn run_merkle_tree_processor( + &self, + origin: &HyperlaneDomain, + task_monitor: TaskMonitor, + ) -> Instrumented> { let metrics = MerkleTreeProcessorMetrics::new(); let merkle_tree_processor = MerkleTreeProcessor::new( self.dbs.get(origin).unwrap().clone(), @@ -445,7 +488,7 @@ impl Relayer { ); let span = info_span!("MerkleTreeProcessor", origin=%merkle_tree_processor.domain()); - let processor = Processor::new(Box::new(merkle_tree_processor)); + let processor = Processor::new(Box::new(merkle_tree_processor), task_monitor.clone()); processor.spawn().instrument(span) } @@ -457,6 +500,7 @@ impl Relayer { receiver: UnboundedReceiver, retry_receiver_channel: MpmcReceiver, batch_size: u32, + task_monitor: TaskMonitor, ) -> Instrumented> { let serial_submitter = SerialSubmitter::new( destination.clone(), @@ -464,10 +508,11 @@ impl Relayer { retry_receiver_channel, SerialSubmitterMetrics::new(&self.core.metrics, destination), batch_size, + task_monitor.clone(), ); let span = info_span!("SerialSubmitter", destination=%destination); let destination = destination.clone(); - tokio::spawn(async move { + tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { // Propagate task panics serial_submitter.spawn().await.unwrap_or_else(|err| { panic!( @@ -475,7 +520,7 @@ impl Relayer { destination, err ) }); - }) + })) .instrument(span) } } diff --git a/rust/agents/scraper/Cargo.toml b/rust/agents/scraper/Cargo.toml index 587a56b8dd..2348135731 100644 --- a/rust/agents/scraper/Cargo.toml +++ b/rust/agents/scraper/Cargo.toml @@ -12,6 +12,7 @@ version.workspace = true [dependencies] async-trait.workspace = true config.workspace = true +console-subscriber.workspace = true derive_more.workspace = true ethers.workspace = true eyre.workspace = true diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index cc113cacfd..d713432819 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -44,6 +44,7 @@ impl BaseAgent for Scraper { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + _tokio_console_server: console_subscriber::Server, ) -> eyre::Result where Self: Sized, diff --git a/rust/agents/validator/Cargo.toml b/rust/agents/validator/Cargo.toml index 98a5fe6f83..e9e66eb303 100644 --- a/rust/agents/validator/Cargo.toml +++ b/rust/agents/validator/Cargo.toml @@ -13,6 +13,7 @@ version.workspace = true async-trait.workspace = true axum.workspace = true config.workspace = true +console-subscriber.workspace = true derive_more.workspace = true derive-new.workspace = true ethers.workspace = true diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 8d28980098..043ac9249d 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -63,6 +63,7 @@ impl BaseAgent for Validator { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + _tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized, diff --git a/rust/hyperlane-base/Cargo.toml b/rust/hyperlane-base/Cargo.toml index 97d84b6221..9f401b399f 100644 --- a/rust/hyperlane-base/Cargo.toml +++ b/rust/hyperlane-base/Cargo.toml @@ -15,6 +15,7 @@ axum.workspace = true bs58.workspace = true color-eyre = { workspace = true, optional = true } config.workspace = true +console-subscriber.workspace = true convert_case.workspace = true derive_builder.workspace = true derive-new.workspace = true diff --git a/rust/hyperlane-base/src/agent.rs b/rust/hyperlane-base/src/agent.rs index 5f6b504e9a..153526d584 100644 --- a/rust/hyperlane-base/src/agent.rs +++ b/rust/hyperlane-base/src/agent.rs @@ -44,6 +44,7 @@ pub trait BaseAgent: Send + Sync + Debug { metrics: Arc, agent_metrics: AgentMetrics, chain_metrics: ChainMetrics, + tokio_console_server: console_subscriber::Server, ) -> Result where Self: Sized; @@ -75,10 +76,17 @@ pub async fn agent_main() -> Result<()> { let core_settings: &Settings = settings.as_ref(); let metrics = settings.as_ref().metrics(A::AGENT_NAME)?; - core_settings.tracing.start_tracing(&metrics)?; + let tokio_server = core_settings.tracing.start_tracing(&metrics)?; let agent_metrics = create_agent_metrics(&metrics)?; let chain_metrics = create_chain_metrics(&metrics)?; - let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?; + let agent = A::from_settings( + settings, + metrics.clone(), + agent_metrics, + chain_metrics, + tokio_server, + ) + .await?; // This await will only end if a panic happens. We won't crash, but instead gracefully shut down agent.run().await; diff --git a/rust/hyperlane-base/src/settings/trace/mod.rs b/rust/hyperlane-base/src/settings/trace/mod.rs index b0640f36ee..00d9cb4c50 100644 --- a/rust/hyperlane-base/src/settings/trace/mod.rs +++ b/rust/hyperlane-base/src/settings/trace/mod.rs @@ -60,7 +60,7 @@ pub struct TracingConfig { impl TracingConfig { /// Attempt to instantiate and register a tracing subscriber setup from /// settings. - pub fn start_tracing(&self, metrics: &CoreMetrics) -> Result<()> { + pub fn start_tracing(&self, metrics: &CoreMetrics) -> Result { let mut target_layer = Targets::new().with_default(self.level); if self.level < Level::DependencyTrace { @@ -70,6 +70,7 @@ impl TracingConfig { .with_target("rusoto_core", Level::Info) .with_target("rustls", Level::Info) .with_target("reqwest", Level::Info) + .with_target("runtime", Level::Debug) .with_target("h2", Level::Info) .with_target("tower", Level::Info) .with_target("tendermint", Level::Info) @@ -85,13 +86,15 @@ impl TracingConfig { let fmt_layer: LogOutputLayer<_> = self.fmt.into(); let err_layer = tracing_error::ErrorLayer::default(); + let (tokio_layer, tokio_server) = console_subscriber::ConsoleLayer::new(); let subscriber = tracing_subscriber::Registry::default() + .with(tokio_layer) .with(target_layer) .with(TimeSpanLifetime::new(metrics)) .with(fmt_layer) .with(err_layer); subscriber.try_init()?; - Ok(()) + Ok(tokio_server) } }