Skip to content

Commit

Permalink
Extend relayer metrics (#306)
Browse files Browse the repository at this point in the history
Co-authored-by: drewstone <drewstone329@gmail.com>
  • Loading branch information
salman01zp and drewstone authored Dec 27, 2022
1 parent aa643f2 commit 92e3239
Show file tree
Hide file tree
Showing 39 changed files with 944 additions and 576 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion crates/event-watcher-traits/src/evm/bridge_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tokio::sync::Mutex;

use super::{event_watcher::EventWatcher, *};

/// A Bridge Watcher is a trait for Bridge contracts that not specific for watching events from that contract,
Expand Down Expand Up @@ -51,7 +53,7 @@ where
client: Arc<providers::Provider<providers::Http>>,
store: Arc<Self::Store>,
contract: Self::Contract,
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
let backoff = backoff::backoff::Constant::new(Duration::from_secs(1));
let task = || async {
Expand Down Expand Up @@ -87,14 +89,19 @@ where
// this a transient error, so we will retry again.
tracing::warn!("Restarting bridge event watcher ...");
// metric for when the bridge watcher enters back off
let metrics = metrics.lock().await;
metrics.bridge_watcher_back_off.inc();
drop(metrics);
return Err(backoff::Error::transient(e));
}
}
}
};
// Bridge watcher backoff metric
let metrics = metrics.lock().await;
metrics.bridge_watcher_back_off.inc();
drop(metrics);

backoff::future::retry(backoff, task).await?;
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions crates/event-watcher-traits/src/evm/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tokio::sync::Mutex;
use webb_relayer_utils::retry;

use super::*;
Expand Down Expand Up @@ -265,7 +266,7 @@ pub trait EventHandler {
store: Arc<Self::Store>,
contract: &Self::Contract,
(event, log): (Self::Events, contract::LogMeta),
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()>;
}

Expand All @@ -290,7 +291,7 @@ pub trait EventHandlerWithRetry: EventHandler {
contract: &Self::Contract,
(event, log): (Self::Events, contract::LogMeta),
backoff: impl backoff::backoff::Backoff + Send + Sync + 'static,
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
let wrapped_task = || {
self.handle_event(
Expand Down
14 changes: 9 additions & 5 deletions crates/event-watcher-traits/src/substrate/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use tokio::sync::Mutex;
use webb_relayer_utils::metric;

use super::*;

/// Represents a Substrate event watcher.
Expand Down Expand Up @@ -40,7 +43,7 @@ pub trait SubstrateEventWatcher {
store: Arc<Self::Store>,
client: Arc<Self::Client>,
(event, block_number): (Self::FilteredEvent, BlockNumberOf<Self>),
metrics: Arc<webb_relayer_utils::metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()>;

/// Returns a task that should be running in the background
Expand All @@ -58,10 +61,10 @@ pub trait SubstrateEventWatcher {
chain_id: u32,
client: Arc<Self::Client>,
store: Arc<Self::Store>,
metrics: Arc<webb_relayer_utils::metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
let backoff = backoff::backoff::Constant::new(Duration::from_secs(1));

let metrics_clone = metrics.clone();
let task = || async {
let mut instant = std::time::Instant::now();
let step = 1u64;
Expand Down Expand Up @@ -160,7 +163,7 @@ pub trait SubstrateEventWatcher {
store.clone(),
client.clone(),
(event, block_number),
metrics.clone(),
metrics_clone.clone(),
)
.await;
match result {
Expand Down Expand Up @@ -223,7 +226,8 @@ pub trait SubstrateEventWatcher {
}
};
// Bridge watcher backoff metric
metrics.bridge_watcher_back_off.inc();
metrics.lock().await.bridge_watcher_back_off.inc();
drop(metrics);
backoff::future::retry(backoff, task).await?;
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion crates/event-watcher-traits/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// limitations under the License.

use std::sync::Arc;
use tokio::sync::Mutex;
use webb::substrate::dkg_runtime::api::system;
use webb::substrate::subxt::{OnlineClient, PolkadotConfig};
use webb::substrate::{dkg_runtime, subxt};
use webb_relayer_context::RelayerContext;
use webb_relayer_store::sled::SledStore;
use webb_relayer_utils::metric;

use crate::substrate::BlockNumberOf;
use crate::SubstrateEventWatcher;
Expand All @@ -43,7 +45,7 @@ impl SubstrateEventWatcher for RemarkedEventWatcher {
_store: Arc<Self::Store>,
_client: Arc<Self::Client>,
(event, block_number): (Self::FilteredEvent, BlockNumberOf<Self>),
_metrics: Arc<webb_relayer_utils::metric::Metrics>,
_metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
tracing::debug!(
"Received `Remarked` Event: {:?} at block number: #{}",
Expand Down
5 changes: 3 additions & 2 deletions crates/proposal-signing-backends/src/dkg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;
use futures::StreamExt;
use tokio::sync::Mutex;
use webb::substrate::dkg_runtime::api::runtime_types::webb_proposals::header::{TypedChainId, ResourceId};
use webb::substrate::dkg_runtime::api::runtime_types::webb_proposals::nonce::Nonce;
use webb::substrate::subxt::{OnlineClient, PolkadotConfig};
Expand Down Expand Up @@ -79,7 +80,7 @@ impl super::ProposalSigningBackend for DkgProposalSigningBackend {
async fn handle_proposal(
&self,
proposal: &(impl ProposalTrait + Sync + Send + 'static),
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
let tx_api = RuntimeApi::tx().dkg_proposals();
let resource_id = proposal.header().resource_id();
Expand Down Expand Up @@ -139,8 +140,8 @@ impl super::ProposalSigningBackend for DkgProposalSigningBackend {
let maybe_success = v.wait_for_success().await;
match maybe_success {
Ok(_events) => {
metrics.lock().await.proposals_signed.inc();
tracing::debug!("tx finalized");
metrics.proposals_signed.inc();
}
Err(err) => {
tracing::error!(error = %err, "tx failed");
Expand Down
3 changes: 2 additions & 1 deletion crates/proposal-signing-backends/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! - `DKGProposalSigningBackend`: This is the actual proposal signing backend that is used in production.

use std::sync::Arc;
use tokio::sync::Mutex;
use webb_proposals::ProposalTrait;

/// A module to handle proposals
Expand Down Expand Up @@ -62,6 +63,6 @@ pub trait ProposalSigningBackend {
async fn handle_proposal(
&self,
proposal: &(impl ProposalTrait + Sync + Send + 'static),
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()>;
}
5 changes: 3 additions & 2 deletions crates/proposal-signing-backends/src/mocked.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ethereum_types::H256;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use typed_builder::TypedBuilder;
use webb::evm::ethers::core::k256::SecretKey;
use webb::evm::ethers::prelude::*;
Expand Down Expand Up @@ -59,7 +60,7 @@ where
async fn handle_proposal(
&self,
proposal: &(impl ProposalTrait + Sync + Send + 'static),
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()> {
// Proposal will be signed by active governor/maintainer.
// Proposal will be then enqueued for execution with BridgeKey as TypedChainId
Expand All @@ -86,7 +87,7 @@ where
signature = ?hex::encode(&signature_bytes),
);
// Proposal signed metric
metrics.proposals_signed.inc();
metrics.lock().await.proposals_signed.inc();
// now all we have to do is to send the data and the signature to the signature bridge.
self.store.enqueue_item(
SledQueueKey::from_bridge_key(bridge_key),
Expand Down
3 changes: 2 additions & 1 deletion crates/proposal-signing-backends/src/proposal_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::ProposalSigningBackend;
use std::sync::Arc;
use tokio::sync::Mutex;
use webb::evm::contract::protocol_solidity::v_anchor_contract;
use webb::evm::ethers::prelude::EthCall;
use webb_proposals::ProposalTrait;
Expand All @@ -22,7 +23,7 @@ use webb_relayer_utils::metric;
pub async fn handle_proposal<P>(
proposal: &(impl ProposalTrait + Sync + Send + 'static),
proposal_signing_backend: &P,
metrics: Arc<metric::Metrics>,
metrics: Arc<Mutex<metric::Metrics>>,
) -> webb_relayer_utils::Result<()>
where
P: ProposalSigningBackend,
Expand Down
9 changes: 4 additions & 5 deletions crates/relayer-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::broadcast;
use tokio::sync::{broadcast, Mutex};

#[cfg(feature = "evm")]
use webb::evm::ethers::core::k256::SecretKey;
Expand All @@ -32,8 +32,7 @@ use webb::substrate::subxt;
#[cfg(feature = "substrate")]
use webb::substrate::subxt::ext::sp_core::sr25519::Pair as Sr25519Pair;

use webb_relayer_utils::metric;
use webb_relayer_utils::metric::Metrics;
use webb_relayer_utils::metric::{self, Metrics};

/// RelayerContext contains Relayer's configuration and shutdown signal.
#[derive(Clone)]
Expand All @@ -50,14 +49,14 @@ pub struct RelayerContext {
/// safe terminal state, and completes the task.
notify_shutdown: broadcast::Sender<()>,
/// Represents the metrics for the relayer
pub metrics: Arc<metric::Metrics>,
pub metrics: Arc<Mutex<metric::Metrics>>,
}

impl RelayerContext {
/// Creates a new RelayerContext.
pub fn new(config: webb_relayer_config::WebbRelayerConfig) -> Self {
let (notify_shutdown, _) = broadcast::channel(2);
let metrics = Arc::new(Metrics::new());
let metrics = Arc::new(Mutex::new(Metrics::new()));
Self {
config,
notify_shutdown,
Expand Down
Loading

0 comments on commit 92e3239

Please sign in to comment.