Skip to content

Commit

Permalink
feat: Relayer tokio task instrumentation (#3760)
Browse files Browse the repository at this point in the history
### Description

- Started off adding tokio-metrics but then realised those are quite
general, so while we do have instrumentation it's not exposed in our
metrics endpoint
- switched to adding
[tokio-console](https://github.com/tokio-rs/console/tree/main), which
does give insight into the lifetime of specific tasks, so we can check
which ones take up a long time during relayer startup. These are only
visible at the `dependencyTrace` log level, so don't affect performance
in the `hyperlane` context.

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

- Helps debug
#3454 and any
future performance issues
- Does half the work for
#3239 (still
need to expose these in the metrics endpoint and import the grafana
template)

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
daniel-savu authored May 22, 2024
1 parent 1f9a3e9 commit 27aabf2
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 61 deletions.
2 changes: 2 additions & 0 deletions rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
108 changes: 99 additions & 9 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bytes = "1"
clap = "4"
color-eyre = "0.6"
config = "0.13.3"
console-subscriber = "0.2.0"
convert_case = "0.6"
cosmrs = { version = "0.14", default-features = false, features = [
"cosmwasm",
Expand Down Expand Up @@ -170,7 +171,8 @@ tendermint-rpc = { version = "0.32.0", features = ["http-client", "tokio"] }
thiserror = "1.0"
time = "0.3"
tiny-keccak = "2.0.2"
tokio = { version = "1", features = ["parking_lot"] }
tokio = { version = "1", features = ["parking_lot", "tracing"] }
tokio-metrics = { version = "0.3.1", default-features = false }
tokio-test = "0.4"
toml_edit = "0.19.14"
tonic = "0.9.2"
Expand Down
2 changes: 1 addition & 1 deletion rust/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN \
--mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \
--mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \
--mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \
cargo build --release --bin validator --bin relayer --bin scraper && \
RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \
mkdir -p /release && \
cp /usr/src/target/release/validator /release && \
cp /usr/src/target/release/relayer /release && \
Expand Down
2 changes: 2 additions & 0 deletions rust/agents/relayer/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
2 changes: 2 additions & 0 deletions rust/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ version.workspace = true
async-trait.workspace = true
axum.workspace = true
config.workspace = true
console-subscriber.workspace = true
convert_case.workspace = true
derive-new.workspace = true
derive_more.workspace = true
Expand All @@ -32,6 +33,7 @@ serde_json.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi-thread"] }
tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing.workspace = true

Expand Down
65 changes: 40 additions & 25 deletions rust/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use derive_new::new;
use futures::future::join_all;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGaugeVec};
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_metrics::TaskMonitor;
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};
use tracing::{info, warn};

Expand Down Expand Up @@ -82,12 +82,18 @@ pub struct SerialSubmitter {
metrics: SerialSubmitterMetrics,
/// Max batch size for submitting messages
max_batch_size: u32,
/// tokio task monitor
task_monitor: TaskMonitor,
}

impl SerialSubmitter {
pub fn spawn(self) -> Instrumented<JoinHandle<()>> {
let span = info_span!("SerialSubmitter", destination=%self.domain);
spawn(async move { self.run().await }).instrument(span)
let task_monitor = self.task_monitor.clone();
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
self.run().await
}))
.instrument(span)
}

async fn run(self) {
Expand All @@ -97,6 +103,7 @@ impl SerialSubmitter {
rx: rx_prepare,
retry_rx,
max_batch_size,
task_monitor,
} = self;
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
Expand All @@ -115,32 +122,40 @@ impl SerialSubmitter {
);

let tasks = [
spawn(receive_task(
domain.clone(),
rx_prepare,
prepare_queue.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
receive_task(domain.clone(), rx_prepare, prepare_queue.clone()),
)),
spawn(prepare_task(
domain.clone(),
prepare_queue.clone(),
submit_queue.clone(),
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
prepare_task(
domain.clone(),
prepare_queue.clone(),
submit_queue.clone(),
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
),
)),
spawn(submit_task(
domain.clone(),
submit_queue,
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
submit_task(
domain.clone(),
submit_queue,
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
),
)),
spawn(confirm_task(
domain.clone(),
prepare_queue,
confirm_queue,
max_batch_size,
metrics,
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
confirm_task(
domain.clone(),
prepare_queue,
confirm_queue,
max_batch_size,
metrics,
),
)),
];

Expand Down
Loading

0 comments on commit 27aabf2

Please sign in to comment.