diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index 41ac5b6d62..d22e0c3c83 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -29,12 +29,10 @@ impl OpQueue { /// it's very likely that its status has just changed, so this forces the caller to consider the new status #[instrument(skip(self), ret, fields(queue_label=%self.queue_metrics_label), level = "trace")] pub async fn push(&self, mut op: QueueOperation, new_status: Option) { - if let Some(new_status) = new_status { - op.set_status_and_update_metrics( - new_status, - Arc::new(self.get_operation_metric(op.as_ref())), - ); - } + op.set_status_and_update_metrics( + new_status, + Arc::new(self.get_operation_metric(op.as_ref())), + ); self.queue.lock().await.push(Reverse(op)); } diff --git a/rust/main/agents/relayer/src/msg/op_submitter.rs b/rust/main/agents/relayer/src/msg/op_submitter.rs index 0694595ae3..f35a991c45 100644 --- a/rust/main/agents/relayer/src/msg/op_submitter.rs +++ b/rust/main/agents/relayer/src/msg/op_submitter.rs @@ -293,6 +293,7 @@ async fn prepare_task( } PendingOperationResult::Drop => { metrics.ops_dropped.inc(); + op.decrement_metric_if_exists(); } PendingOperationResult::Confirm(reason) => { debug!(?op, "Pushing operation to confirm queue"); @@ -369,6 +370,7 @@ async fn submit_single_operation( } PendingOperationResult::Drop => { // Not expected to hit this case in `submit`, but it's here for completeness + op.decrement_metric_if_exists(); } PendingOperationResult::Success | PendingOperationResult::Confirm(_) => { confirm_op(op, confirm_queue, metrics).await @@ -457,9 +459,7 @@ async fn confirm_operation( PendingOperationResult::Success => { debug!(?op, "Operation confirmed"); metrics.ops_confirmed.inc(); - if let Some(metric) = op.get_metric() { - metric.dec() - } + op.decrement_metric_if_exists(); } PendingOperationResult::NotReady => { confirm_queue.push(op, None).await; @@ -478,6 +478,7 @@ async fn confirm_operation( } PendingOperationResult::Drop => { metrics.ops_dropped.inc(); + op.decrement_metric_if_exists(); } } operation_result diff --git a/rust/main/hyperlane-core/src/traits/pending_operation.rs b/rust/main/hyperlane-core/src/traits/pending_operation.rs index 8906777c39..f5480b197b 100644 --- a/rust/main/hyperlane-core/src/traits/pending_operation.rs +++ b/rust/main/hyperlane-core/src/traits/pending_operation.rs @@ -67,6 +67,13 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs { /// Get the metric associated with this operation. fn get_metric(&self) -> Option>; + /// Decrement the metric associated with this operation if it exists. + fn decrement_metric_if_exists(&self) { + if let Some(metric) = self.get_metric() { + metric.dec(); + } + } + /// Set the metric associated with this operation. fn set_metric(&mut self, metric: Arc); @@ -80,10 +87,12 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs { /// Set the status of the operation and update the metrics. fn set_status_and_update_metrics( &mut self, - status: PendingOperationStatus, + status: Option, new_metric: Arc, ) { - self.set_status(status); + if let Some(status) = status { + self.set_status(status); + } if let Some(old_metric) = self.get_metric() { old_metric.dec(); }