Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Relayer tokio task instrumentation #3760

Merged
merged 15 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
rustflags = ["--cfg", "tokio_unstable"]
108 changes: 99 additions & 9 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bytes = "1"
clap = "4"
color-eyre = "0.6"
config = "0.13.3"
console-subscriber = "0.2.0"
convert_case = "0.6"
cosmrs = { version = "0.14", default-features = false, features = [
"cosmwasm",
Expand Down Expand Up @@ -170,7 +171,8 @@ tendermint-rpc = { version = "0.32.0", features = ["http-client", "tokio"] }
thiserror = "1.0"
time = "0.3"
tiny-keccak = "2.0.2"
tokio = { version = "1", features = ["parking_lot"] }
tokio = { version = "1", features = ["parking_lot", "tracing"] }
tokio-metrics = { version = "0.3.1", default-features = false }
tokio-test = "0.4"
toml_edit = "0.19.14"
tonic = "0.9.2"
Expand Down
2 changes: 1 addition & 1 deletion rust/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN \
--mount=id=cargo,type=cache,sharing=locked,target=/usr/src/target \
--mount=id=cargo-home-registry,type=cache,sharing=locked,target=/usr/local/cargo/registry \
--mount=id=cargo-home-git,type=cache,sharing=locked,target=/usr/local/cargo/git \
cargo build --release --bin validator --bin relayer --bin scraper && \
RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin validator --bin relayer --bin scraper && \
mkdir -p /release && \
cp /usr/src/target/release/validator /release && \
cp /usr/src/target/release/relayer /release && \
Expand Down
2 changes: 2 additions & 0 deletions rust/agents/relayer/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
2 changes: 2 additions & 0 deletions rust/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ version.workspace = true
async-trait.workspace = true
axum.workspace = true
config.workspace = true
console-subscriber.workspace = true
convert_case.workspace = true
derive-new.workspace = true
derive_more.workspace = true
Expand All @@ -32,6 +33,7 @@ serde_json.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi-thread"] }
tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing.workspace = true

Expand Down
65 changes: 40 additions & 25 deletions rust/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use derive_new::new;
use futures::future::join_all;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGaugeVec};
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tokio_metrics::TaskMonitor;
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};
use tracing::{info, warn};

Expand Down Expand Up @@ -82,12 +82,18 @@ pub struct SerialSubmitter {
metrics: SerialSubmitterMetrics,
/// Max batch size for submitting messages
max_batch_size: u32,
/// tokio task monitor
task_monitor: TaskMonitor,
}

impl SerialSubmitter {
pub fn spawn(self) -> Instrumented<JoinHandle<()>> {
let span = info_span!("SerialSubmitter", destination=%self.domain);
spawn(async move { self.run().await }).instrument(span)
let task_monitor = self.task_monitor.clone();
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
self.run().await
}))
.instrument(span)
}

async fn run(self) {
Expand All @@ -97,6 +103,7 @@ impl SerialSubmitter {
rx: rx_prepare,
retry_rx,
max_batch_size,
task_monitor,
} = self;
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
Expand All @@ -115,32 +122,40 @@ impl SerialSubmitter {
);

let tasks = [
spawn(receive_task(
domain.clone(),
rx_prepare,
prepare_queue.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
receive_task(domain.clone(), rx_prepare, prepare_queue.clone()),
)),
spawn(prepare_task(
domain.clone(),
prepare_queue.clone(),
submit_queue.clone(),
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
prepare_task(
domain.clone(),
prepare_queue.clone(),
submit_queue.clone(),
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
),
)),
spawn(submit_task(
domain.clone(),
submit_queue,
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
submit_task(
domain.clone(),
submit_queue,
confirm_queue.clone(),
max_batch_size,
metrics.clone(),
),
)),
spawn(confirm_task(
domain.clone(),
prepare_queue,
confirm_queue,
max_batch_size,
metrics,
tokio::spawn(TaskMonitor::instrument(
&task_monitor,
confirm_task(
domain.clone(),
prepare_queue,
confirm_queue,
max_batch_size,
metrics,
),
)),
];

Expand Down
Loading
Loading