From 7a28de0915e8ff3300a55b6176c2ee40802fd993 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vin=C3=ADcius=20Miguel?= <36349314+vrmiguel@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:15:02 -0300 Subject: [PATCH] Integrate heartbeat monitor into metrics reporter --- conductor/src/heartbeat_monitor.rs | 1 + conductor/src/main.rs | 7 ++++++- conductor/src/metrics_reporter.rs | 4 +++- conductor/src/routes/health.rs | 9 +++++++++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/conductor/src/heartbeat_monitor.rs b/conductor/src/heartbeat_monitor.rs index 827c33139..3c4d591ad 100644 --- a/conductor/src/heartbeat_monitor.rs +++ b/conductor/src/heartbeat_monitor.rs @@ -11,6 +11,7 @@ fn current_timestamp() -> u64 { .as_secs() } +#[derive(Clone)] pub struct HeartbeatMonitor { shared_heartbeat: Arc, update_interval: Duration, diff --git a/conductor/src/main.rs b/conductor/src/main.rs index b4af1ed5b..2c34649bd 100644 --- a/conductor/src/main.rs +++ b/conductor/src/main.rs @@ -1,6 +1,7 @@ use actix_web::{web, App, HttpServer}; use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestTracing}; use conductor::errors::ConductorError; +use conductor::heartbeat_monitor; use conductor::monitoring::CustomMetrics; use conductor::{ cloud::CloudProvider, create_cloudformation, create_gcp_storage_workload_identity_binding, @@ -28,6 +29,7 @@ use sqlx::error::Error; use sqlx::postgres::PgPoolOptions; use std::env; use std::sync::{Arc, Mutex}; +use std::time::Duration; use std::{thread, time}; use types::{CRUDevent, Event}; @@ -695,6 +697,8 @@ async fn main() -> std::io::Result<()> { let status_reporter_enabled = from_env_default("WATCHER_ENABLED", "true"); let metrics_reported_enabled = from_env_default("METRICS_REPORTER_ENABLED", "false"); + let (heartbeat_monitor, heartbeat_updater) = heartbeat_monitor::start(Duration::from_secs(60)); + if conductor_enabled != "false" { info!("Starting conductor"); background_threads_locked.push(tokio::spawn({ @@ -759,7 +763,7 @@ async fn main() -> std::io::Result<()> { let custom_metrics_copy = custom_metrics.clone(); background_threads_locked.push(tokio::spawn(async move { let custom_metrics = &custom_metrics_copy; - if let Err(err) = run_metrics_reporter().await { + if let Err(err) = run_metrics_reporter(heartbeat_updater.clone()).await { custom_metrics .conductor_errors .add(&opentelemetry::Context::current(), 1, &[]); @@ -783,6 +787,7 @@ async fn main() -> std::io::Result<()> { App::new() .app_data(web::Data::new(custom_metrics.clone())) .app_data(web::Data::new(background_threads.clone())) + .app_data(web::Data::new(heartbeat_monitor.clone())) .wrap(RequestTracing::new()) .route( "/metrics", diff --git a/conductor/src/metrics_reporter.rs b/conductor/src/metrics_reporter.rs index 623c3c382..ecfe19ecc 100644 --- a/conductor/src/metrics_reporter.rs +++ b/conductor/src/metrics_reporter.rs @@ -1,4 +1,5 @@ use anyhow::{bail, Context, Result}; +use conductor::heartbeat_monitor::HeartbeatUpdater; use conductor::metrics::dataplane_metrics::split_data_plane_metrics; use conductor::metrics::{dataplane_metrics::DataPlaneMetrics, prometheus::Metrics}; use log::{error, info}; @@ -28,7 +29,7 @@ fn load_metric_queries() -> Result { serde_yaml::from_str(METRICS_FILE).map_err(Into::into) } -pub async fn run_metrics_reporter() -> Result<()> { +pub async fn run_metrics_reporter(hearbeat_updater: HeartbeatUpdater) -> Result<()> { let client = Client::new().await; let MetricQueries { metrics } = load_metric_queries()?; @@ -55,6 +56,7 @@ pub async fn run_metrics_reporter() -> Result<()> { loop { sync_interval.tick().await; + hearbeat_updater.update_heartbeat(); let now = Instant::now(); for metric in &metrics { diff --git a/conductor/src/routes/health.rs b/conductor/src/routes/health.rs index 18190aeb3..9ff149ee0 100644 --- a/conductor/src/routes/health.rs +++ b/conductor/src/routes/health.rs @@ -1,9 +1,12 @@ use actix_web::{get, web, HttpResponse, Responder}; use std::sync::{Arc, Mutex}; +use crate::heartbeat_monitor::HeartbeatMonitor; + #[get("/lively")] pub async fn background_threads_running( background_threads: web::Data>>>>, + heartbeat_monitor: web::Data, ) -> impl Responder { let background_threads = match background_threads.lock() { Ok(threads) => threads, @@ -12,6 +15,12 @@ pub async fn background_threads_running( .body("Failed to check if background tasks are running.") } }; + + if heartbeat_monitor.is_heartbeat_active() { + return HttpResponse::InternalServerError() + .body("One or more background tasks are not responding."); + } + for thread in background_threads.iter() { if thread.is_finished() { return HttpResponse::InternalServerError()