Skip to content

Commit

Permalink
feat: alert instead of crashing relayer when building cursors (#4811)
Browse files Browse the repository at this point in the history
### Description

- we should make sure there are no `unwrap`s outside a `tokio::spawn`,
otherwise they'll take down the entire relayer. The offenders have so
far been the indexing cursors, so this PR just logs an error and sets a
new metric flag instead of panicking: `critical_error`, an `IntGauge`
type because prometheus doesn't have boolean gauges. For now this is
only used for origin chains, but I envision this as being set when a
critical error occurs in the submitter too, in the future
- before setting the flag, cursor instantiation is now retried 10 times
(to avoid short lived RPC hiccups)

### Drive-by changes
Removes usage of `Builder` derive macro in metric types, since they're
not complex enough to warrant it

### Related issues

### Backward compatibility

Yes

### Testing

the existing e2e
  • Loading branch information
daniel-savu authored Nov 5, 2024
1 parent a82b4b4 commit b72c881
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 49 deletions.
1 change: 0 additions & 1 deletion rust/main/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 64 additions & 13 deletions rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use hyperlane_base::{
broadcast::BroadcastMpscSender,
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, MetricsUpdater},
settings::ChainConf,
settings::{ChainConf, IndexSettings},
AgentMetadata, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics,
HyperlaneAgentCore, SyncOptions,
};
use hyperlane_core::{
rpc_clients::call_and_retry_n_times, ChainCommunicationError, ContractSyncCursor,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, QueueOperation,
H512, U256,
};
Expand Down Expand Up @@ -50,6 +51,9 @@ use crate::{
};
use crate::{processor::Processor, server::ENDPOINT_MESSAGES_QUEUE_SIZE};

const CURSOR_BUILDING_ERROR: &str = "Error building cursor for origin";
const CURSOR_INSTANTIATION_ATTEMPTS: usize = 10;

#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
struct ContextKey {
origin: u32,
Expand Down Expand Up @@ -354,6 +358,7 @@ impl BaseAgent for Relayer {
}

for origin in &self.origin_chains {
self.chain_metrics.set_critical_error(origin.name(), false);
let maybe_broadcaster = self
.message_syncs
.get(origin)
Expand Down Expand Up @@ -412,17 +417,51 @@ impl BaseAgent for Relayer {
}

impl Relayer {
fn record_critical_error(
&self,
origin: &HyperlaneDomain,
err: ChainCommunicationError,
message: &str,
) {
error!(?err, origin=?origin, "{message}");
self.chain_metrics.set_critical_error(origin.name(), true);
}

async fn instantiate_cursor_with_retries<T: 'static>(
contract_sync: Arc<dyn ContractSyncer<T>>,
index_settings: IndexSettings,
) -> Result<Box<dyn ContractSyncCursor<T>>, ChainCommunicationError> {
call_and_retry_n_times(
|| {
let contract_sync = contract_sync.clone();
let index_settings = index_settings.clone();
Box::pin(async move {
let cursor = contract_sync.cursor(index_settings).await?;
Ok(cursor)
})
},
CURSOR_INSTANTIATION_ATTEMPTS,
)
.await
}

async fn run_message_sync(
&self,
origin: &HyperlaneDomain,
task_monitor: TaskMonitor,
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self.message_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
let cursor_instantiation_result =
Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone())
.await;
let cursor = match cursor_instantiation_result {
Ok(cursor) => cursor,
Err(err) => {
self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR);
return tokio::spawn(async {}).instrument(info_span!("MessageSync"));
}
};
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
Expand All @@ -444,10 +483,16 @@ impl Relayer {
.get(origin)
.unwrap()
.clone();
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
let cursor_instantiation_result =
Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone())
.await;
let cursor = match cursor_instantiation_result {
Ok(cursor) => cursor,
Err(err) => {
self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR);
return tokio::spawn(async {}).instrument(info_span!("IgpSync"));
}
};
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
Expand All @@ -468,10 +513,16 @@ impl Relayer {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
let cursor_instantiation_result =
Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone())
.await;
let cursor = match cursor_instantiation_result {
Ok(cursor) => cursor,
Err(err) => {
self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR);
return tokio::spawn(async {}).instrument(info_span!("MerkleTreeHookSync"));
}
};
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
Expand Down
1 change: 0 additions & 1 deletion rust/main/hyperlane-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ color-eyre = { workspace = true, optional = true }
config.workspace = true
console-subscriber.workspace = true
convert_case.workspace = true
derive_builder.workspace = true
derive-new.workspace = true
ed25519-dalek.workspace = true
ethers.workspace = true
Expand Down
7 changes: 3 additions & 4 deletions rust/main/hyperlane-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use hyperlane_core::config::*;
use tracing::info;

use crate::{
create_chain_metrics,
metrics::{create_agent_metrics, AgentMetrics, CoreMetrics},
metrics::{AgentMetrics, CoreMetrics},
settings::Settings,
ChainMetrics,
};
Expand Down Expand Up @@ -88,8 +87,8 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {

let metrics = settings.as_ref().metrics(A::AGENT_NAME)?;
let tokio_server = core_settings.tracing.start_tracing(&metrics)?;
let agent_metrics = create_agent_metrics(&metrics)?;
let chain_metrics = create_chain_metrics(&metrics)?;
let agent_metrics = AgentMetrics::new(&metrics)?;
let chain_metrics = ChainMetrics::new(&metrics)?;
let agent = A::from_settings(
agent_metadata,
settings,
Expand Down
88 changes: 58 additions & 30 deletions rust/main/hyperlane-base/src/metrics/agent_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::sync::Arc;
use std::time::Duration;

use derive_builder::Builder;
use eyre::Result;
use hyperlane_core::metrics::agent::decimals_by_protocol;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
Expand Down Expand Up @@ -45,8 +44,14 @@ pub const GAS_PRICE_LABELS: &[&str] = &["chain"];
pub const GAS_PRICE_HELP: &str =
"Tracks the current gas price of the chain, in the lowest denomination (e.g. wei)";

/// Expected label names for the `critical_error` metric.
pub const CRITICAL_ERROR_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const CRITICAL_ERROR_HELP: &str =
"Boolean marker for critical errors on a chain, signalling loss of liveness";

/// Agent-specific metrics
#[derive(Clone, Builder, Debug)]
#[derive(Clone, Debug)]
pub struct AgentMetrics {
/// Current balance of native tokens for the
/// wallet address.
Expand All @@ -57,47 +62,74 @@ pub struct AgentMetrics {
/// - `token_address`: Address of the token.
/// - `token_symbol`: Symbol of the token.
/// - `token_name`: Full name of the token.
#[builder(setter(into, strip_option), default)]
wallet_balance: Option<GaugeVec>,
}

pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result<AgentMetrics> {
Ok(AgentMetricsBuilder::default()
.wallet_balance(metrics.new_gauge(
"wallet_balance",
WALLET_BALANCE_HELP,
WALLET_BALANCE_LABELS,
)?)
.build()?)
impl AgentMetrics {
pub(crate) fn new(metrics: &CoreMetrics) -> Result<AgentMetrics> {
let agent_metrics = AgentMetrics {
wallet_balance: Some(metrics.new_gauge(
"wallet_balance",
WALLET_BALANCE_HELP,
WALLET_BALANCE_LABELS,
)?),
};
Ok(agent_metrics)
}
}

/// Chain-specific metrics
#[derive(Clone, Builder, Debug)]
#[derive(Clone, Debug)]
pub struct ChainMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain
/// the block number refers to.
#[builder(setter(into))]
pub block_height: IntGaugeVec,

/// Tracks the current gas price of the chain. Uses the base_fee_per_gas if
/// available or else sets this to none.
/// TODO: use the median of the transactions.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the gas price refers to.
#[builder(setter(into, strip_option), default)]
pub gas_price: Option<GaugeVec>,

/// Boolean marker for critical errors on a chain, signalling loss of liveness.
critical_error: IntGaugeVec,
}

pub(crate) fn create_chain_metrics(metrics: &CoreMetrics) -> Result<ChainMetrics> {
Ok(ChainMetricsBuilder::default()
.block_height(metrics.new_int_gauge(
"block_height",
BLOCK_HEIGHT_HELP,
BLOCK_HEIGHT_LABELS,
)?)
.gas_price(metrics.new_gauge("gas_price", GAS_PRICE_HELP, GAS_PRICE_LABELS)?)
.build()?)
impl ChainMetrics {
pub(crate) fn new(metrics: &CoreMetrics) -> Result<ChainMetrics> {
let block_height_metrics =
metrics.new_int_gauge("block_height", BLOCK_HEIGHT_HELP, BLOCK_HEIGHT_LABELS)?;
let gas_price_metrics = metrics.new_gauge("gas_price", GAS_PRICE_HELP, GAS_PRICE_LABELS)?;
let critical_error_metrics =
metrics.new_int_gauge("critical_error", CRITICAL_ERROR_HELP, CRITICAL_ERROR_LABELS)?;
let chain_metrics = ChainMetrics {
block_height: block_height_metrics,
gas_price: Some(gas_price_metrics),
critical_error: critical_error_metrics,
};
Ok(chain_metrics)
}

pub(crate) fn set_gas_price(&self, chain: &str, price: f64) {
if let Some(gas_price) = &self.gas_price {
gas_price.with(&hashmap! { "chain" => chain }).set(price);
}
}

pub(crate) fn set_block_height(&self, chain: &str, height: i64) {
self.block_height
.with(&hashmap! { "chain" => chain })
.set(height);
}

/// Flag that a critical error has occurred on the chain
pub fn set_critical_error(&self, chain: &str, is_critical: bool) {
self.critical_error
.with(&hashmap! { "chain" => chain })
.set(is_critical as i64);
}
}

/// Configuration for the prometheus middleware. This can be loaded via serde.
Expand Down Expand Up @@ -174,8 +206,6 @@ impl MetricsUpdater {
}

async fn update_block_details(&self) {
let block_height = self.chain_metrics.block_height.clone();
let gas_price = self.chain_metrics.gas_price.clone();
if let HyperlaneDomain::Unknown { .. } = self.conf.domain {
return;
};
Expand All @@ -195,10 +225,8 @@ impl MetricsUpdater {

let height = chain_metrics.latest_block.number as i64;
trace!(chain, height, "Fetched block height for metrics");
block_height
.with(&hashmap! { "chain" => chain })
.set(height);
if let Some(gas_price) = gas_price {
self.chain_metrics.set_block_height(chain, height);
if self.chain_metrics.gas_price.is_some() {
let protocol = self.conf.domain.domain_protocol();
let decimals_scale = 10f64.powf(decimals_by_protocol(protocol).into());
let gas = u256_as_scaled_f64(chain_metrics.min_gas_price.unwrap_or_default(), protocol)
Expand All @@ -208,7 +236,7 @@ impl MetricsUpdater {
gas = format!("{gas:.2}"),
"Gas price updated for chain (using lowest denomination)"
);
gas_price.with(&hashmap! { "chain" => chain }).set(gas);
self.chain_metrics.set_gas_price(chain, gas);
}
}

Expand Down

0 comments on commit b72c881

Please sign in to comment.