Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conductor: add heartbeat monitor for background workers #1023

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions conductor/src/heartbeat_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("now() is not later than UNIX_EPOCH")
.as_secs()
}

#[derive(Clone)]
pub struct HeartbeatMonitor {
shared_heartbeat: Arc<AtomicU64>,
update_interval: Duration,
}

#[derive(Clone)]
pub struct HeartbeatUpdater {
shared_heartbeat: Arc<AtomicU64>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with a lock-free approach rather than something like Arc<RwLock<Instant>>, as the issue with workers getting stuck might stem from lock contention, so adding more lock contention probably wouldn't help

}

/// Initializes and returns both a [`HeartbeatMonitor`] and [`HeartbeatUpdater`].
pub fn start(expected_update_interval: Duration) -> (HeartbeatMonitor, HeartbeatUpdater) {
let heartbeat = Arc::new(AtomicU64::new(current_timestamp()));

let heartbeat_monitor = HeartbeatMonitor {
shared_heartbeat: heartbeat.clone(),
update_interval: expected_update_interval,
};
let heartbeat_updater = HeartbeatUpdater {
shared_heartbeat: heartbeat,
};

(heartbeat_monitor, heartbeat_updater)
}

impl HeartbeatMonitor {
/// Checks if the heartbeat is still active
///
/// # Returns true if the heartbeat has been updated within the expected time frame, false if the heartbeat has not been updated within twice the expected timeout duration
pub fn is_heartbeat_active(&self) -> bool {
let last_update = self.shared_heartbeat.load(Ordering::Relaxed);
let current_time = current_timestamp();

if current_time >= last_update {
let elapsed = Duration::from_secs(current_time - last_update);
elapsed < self.update_interval * 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the * 2 part for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check if there's been an update within twice the expected timeout duration

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that, but why 2x? Maybe that should just be part of the update interval config?

Keep in mind there is healthcheck config on kubernetes side too. Like how many consecutive failed requests will restart the pod. I

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could replace self.update_interval by timeout_interval and then just use. elapsed < self.timeout_interval, what do you think?

} else {
// System time went backwards or clock drift, consider the heartbeat stale
false
}
}
}

impl HeartbeatUpdater {
pub fn update_heartbeat(&self) {
self.shared_heartbeat
.store(current_timestamp(), Ordering::Relaxed);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep this as something manually updated rather than being updated by yet another background thread


#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::heartbeat_monitor;

#[tokio::test]
async fn check_heartbeat_monitor() {
let (monitor, updater) = heartbeat_monitor::start(Duration::from_secs(1));

// Is alive since there's been an update in the last second
assert!(monitor.is_heartbeat_active());

tokio::time::sleep(Duration::from_secs(4)).await;

assert_eq!(monitor.is_heartbeat_active(), false);
updater.update_heartbeat();

assert!(monitor.is_heartbeat_active());
}
}
1 change: 1 addition & 0 deletions conductor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod cloud;
pub mod errors;
pub mod extensions;
pub mod gcp;
pub mod heartbeat_monitor;
pub mod metrics;
pub mod monitoring;
pub mod routes;
Expand Down
7 changes: 6 additions & 1 deletion conductor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use actix_web::{web, App, HttpServer};
use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestTracing};
use conductor::errors::ConductorError;
use conductor::heartbeat_monitor;
use conductor::monitoring::CustomMetrics;
use conductor::{
cloud::CloudProvider, create_cloudformation, create_gcp_storage_workload_identity_binding,
Expand Down Expand Up @@ -28,6 +29,7 @@ use sqlx::error::Error;
use sqlx::postgres::PgPoolOptions;
use std::env;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{thread, time};
use types::{CRUDevent, Event};

Expand Down Expand Up @@ -695,6 +697,8 @@ async fn main() -> std::io::Result<()> {
let status_reporter_enabled = from_env_default("WATCHER_ENABLED", "true");
let metrics_reported_enabled = from_env_default("METRICS_REPORTER_ENABLED", "false");

let (heartbeat_monitor, heartbeat_updater) = heartbeat_monitor::start(Duration::from_secs(60));

if conductor_enabled != "false" {
info!("Starting conductor");
background_threads_locked.push(tokio::spawn({
Expand Down Expand Up @@ -759,7 +763,7 @@ async fn main() -> std::io::Result<()> {
let custom_metrics_copy = custom_metrics.clone();
background_threads_locked.push(tokio::spawn(async move {
let custom_metrics = &custom_metrics_copy;
if let Err(err) = run_metrics_reporter().await {
if let Err(err) = run_metrics_reporter(heartbeat_updater.clone()).await {
custom_metrics
.conductor_errors
.add(&opentelemetry::Context::current(), 1, &[]);
Expand All @@ -783,6 +787,7 @@ async fn main() -> std::io::Result<()> {
App::new()
.app_data(web::Data::new(custom_metrics.clone()))
.app_data(web::Data::new(background_threads.clone()))
.app_data(web::Data::new(heartbeat_monitor.clone()))
.wrap(RequestTracing::new())
.route(
"/metrics",
Expand Down
4 changes: 3 additions & 1 deletion conductor/src/metrics_reporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Context, Result};
use conductor::heartbeat_monitor::HeartbeatUpdater;
use conductor::metrics::dataplane_metrics::split_data_plane_metrics;
use conductor::metrics::{dataplane_metrics::DataPlaneMetrics, prometheus::Metrics};
use log::{error, info};
Expand Down Expand Up @@ -28,7 +29,7 @@ fn load_metric_queries() -> Result<MetricQueries> {
serde_yaml::from_str(METRICS_FILE).map_err(Into::into)
}

pub async fn run_metrics_reporter() -> Result<()> {
pub async fn run_metrics_reporter(hearbeat_updater: HeartbeatUpdater) -> Result<()> {
let client = Client::new().await;

let MetricQueries { metrics } = load_metric_queries()?;
Expand All @@ -55,6 +56,7 @@ pub async fn run_metrics_reporter() -> Result<()> {

loop {
sync_interval.tick().await;
hearbeat_updater.update_heartbeat();

let now = Instant::now();
for metric in &metrics {
Expand Down
14 changes: 13 additions & 1 deletion conductor/src/routes/health.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use actix_web::{get, web, HttpResponse, Responder};
use std::sync::{Arc, Mutex};
use std::{
ops::Not,
sync::{Arc, Mutex},
};

use crate::heartbeat_monitor::HeartbeatMonitor;

#[get("/lively")]
pub async fn background_threads_running(
background_threads: web::Data<Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>>,
heartbeat_monitor: web::Data<HeartbeatMonitor>,
) -> impl Responder {
let background_threads = match background_threads.lock() {
Ok(threads) => threads,
Expand All @@ -12,6 +18,12 @@ pub async fn background_threads_running(
.body("Failed to check if background tasks are running.")
}
};

if heartbeat_monitor.is_heartbeat_active().not() {
return HttpResponse::InternalServerError()
.body("One or more background tasks are not responding.");
}

for thread in background_threads.iter() {
if thread.is_finished() {
return HttpResponse::InternalServerError()
Expand Down
Loading