Skip to content

Commit

Permalink
Integrate heartbeat monitor into metrics reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel committed Oct 21, 2024
1 parent 81477ef commit 6985adb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 1 deletion.
1 change: 1 addition & 0 deletions conductor/src/heartbeat_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ fn current_timestamp() -> u64 {
.as_secs()
}

#[derive(Clone)]
pub struct HeartbeatMonitor {
shared_heartbeat: Arc<AtomicU64>,
update_interval: Duration,
Expand Down
5 changes: 5 additions & 0 deletions conductor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use actix_web::{web, App, HttpServer};
use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestTracing};
use conductor::errors::ConductorError;
use conductor::heartbeat_monitor;
use conductor::monitoring::CustomMetrics;
use conductor::{
cloud::CloudProvider, create_cloudformation, create_gcp_storage_workload_identity_binding,
Expand Down Expand Up @@ -28,6 +29,7 @@ use sqlx::error::Error;
use sqlx::postgres::PgPoolOptions;
use std::env;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{thread, time};
use types::{CRUDevent, Event};

Expand Down Expand Up @@ -695,6 +697,8 @@ async fn main() -> std::io::Result<()> {
let status_reporter_enabled = from_env_default("WATCHER_ENABLED", "true");
let metrics_reported_enabled = from_env_default("METRICS_REPORTER_ENABLED", "false");

let (heartbeat_monitor, heartbeat_updater) = heartbeat_monitor::start(Duration::from_secs(60));

if conductor_enabled != "false" {
info!("Starting conductor");
background_threads_locked.push(tokio::spawn({
Expand Down Expand Up @@ -783,6 +787,7 @@ async fn main() -> std::io::Result<()> {
App::new()
.app_data(web::Data::new(custom_metrics.clone()))
.app_data(web::Data::new(background_threads.clone()))
.app_data(web::Data::new(heartbeat_monitor.clone()))
.wrap(RequestTracing::new())
.route(
"/metrics",
Expand Down
4 changes: 3 additions & 1 deletion conductor/src/metrics_reporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Context, Result};
use conductor::heartbeat_monitor::HeartbeatUpdater;
use conductor::metrics::dataplane_metrics::split_data_plane_metrics;
use conductor::metrics::{dataplane_metrics::DataPlaneMetrics, prometheus::Metrics};
use log::{error, info};
Expand Down Expand Up @@ -28,7 +29,7 @@ fn load_metric_queries() -> Result<MetricQueries> {
serde_yaml::from_str(METRICS_FILE).map_err(Into::into)
}

pub async fn run_metrics_reporter() -> Result<()> {
pub async fn run_metrics_reporter(hearbeat_updater: HeartbeatUpdater) -> Result<()> {
let client = Client::new().await;

let MetricQueries { metrics } = load_metric_queries()?;
Expand All @@ -55,6 +56,7 @@ pub async fn run_metrics_reporter() -> Result<()> {

loop {
sync_interval.tick().await;
hearbeat_updater.update_heartbeat();

let now = Instant::now();
for metric in &metrics {
Expand Down
9 changes: 9 additions & 0 deletions conductor/src/routes/health.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use actix_web::{get, web, HttpResponse, Responder};
use std::sync::{Arc, Mutex};

use crate::heartbeat_monitor::{self, HeartbeatMonitor};

#[get("/lively")]
pub async fn background_threads_running(
background_threads: web::Data<Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>>,
heartbeat_monitor: web::Data<HeartbeatMonitor>,
) -> impl Responder {
let background_threads = match background_threads.lock() {
Ok(threads) => threads,
Expand All @@ -12,6 +15,12 @@ pub async fn background_threads_running(
.body("Failed to check if background tasks are running.")
}
};

if heartbeat_monitor.is_heartbeat_active() {
return HttpResponse::InternalServerError()
.body("One or more background tasks are not responding.");
}

for thread in background_threads.iter() {
if thread.is_finished() {
return HttpResponse::InternalServerError()
Expand Down

0 comments on commit 6985adb

Please sign in to comment.