Skip to content

Commit

Permalink
Fix disabling of VolumeSnapshots when instances are disabled (#983)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhudson authored Sep 27, 2024
1 parent 72b3c9e commit f3f2729
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 18 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ jobs:
set -xe
sudo apt-get update
sudo apt-get install -y pkg-config libssl-dev git
- uses: azure/setup-helm@v3
- uses: extractions/setup-just@v1
- uses: azure/setup-helm@v4
- uses: extractions/setup-just@v2
- name: Install kind
uses: helm/kind-action@v1.7.0
uses: helm/kind-action@v1
with:
install_only: true
- uses: dtolnay/rust-toolchain@stable
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:
# Start the operator in the background
cargo run > operator-output.txt 2>&1 &
# Run the tests
cargo test --jobs 1 -- --ignored --nocapture
cargo test -- --ignored --nocapture
- name: Debugging information
if: always()
run: |
Expand Down
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.50.0"
version = "0.50.1"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2243,7 +2243,7 @@ pub(crate) async fn get_scheduled_backups(cdb: &CoreDB, ctx: Arc<Context>) -> Ve
let scheduled_backup: Api<ScheduledBackup> = Api::namespaced(ctx.client.clone(), &namespace);

// Create a ListParams object to filter the ScheduledBackups
let lp = ListParams::default().fields(&format!("metadata.name={}", instance_name));
let lp = ListParams::default().fields(&format!("metadata.namespace={}", instance_name));

match scheduled_backup.list(&lp).await {
Ok(list) => {
Expand Down
62 changes: 60 additions & 2 deletions tembo-operator/src/cloudnativepg/cnpg_utils.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
pub use crate::{
apis::coredb_types::CoreDB,
cloudnativepg::backups::Backup,
cloudnativepg::clusters::{Cluster, ClusterStatusConditionsStatus},
cloudnativepg::poolers::Pooler,
cloudnativepg::scheduledbackups::ScheduledBackup,
controller,
extensions::database_queries::is_not_restarting,
patch_cdb_status_merge, requeue_normal_with_jitter, Context, RESTARTED_AT,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use kube::{
api::{Patch, PatchParams},
api::{DeleteParams, ListParams, Patch, PatchParams},
runtime::controller::Action,
Api, ResourceExt,
};
Expand Down Expand Up @@ -156,6 +158,7 @@ pub async fn patch_cluster_merge(
pub async fn patch_scheduled_backup_merge(
cdb: &CoreDB,
ctx: &Arc<Context>,
backup_name: &str,
patch: serde_json::Value,
) -> Result<(), Action> {
let name = cdb.name_any();
Expand All @@ -167,7 +170,7 @@ pub async fn patch_scheduled_backup_merge(
let scheduled_backup_api: Api<ScheduledBackup> = Api::namespaced(ctx.client.clone(), namespace);
let pp = PatchParams::apply("patch_merge");
let _ = scheduled_backup_api
.patch(&name, &pp, &Patch::Merge(&patch))
.patch(backup_name, &pp, &Patch::Merge(&patch))
.await
.map_err(|e| {
error!("Error patching cluster: {}", e);
Expand Down Expand Up @@ -320,3 +323,58 @@ pub(crate) async fn is_image_updated(

Ok(())
}

// remove_stalled_backups function takes a CoreDB, Conext and removed any stalled
// backups. A backup is considered stalled if it's older than 6 hours and does not have a status set.
// If a status is missing this means that the backup was never started nor will it ever start.
#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any()))]
pub(crate) async fn removed_stalled_backups(
cdb: &CoreDB,
ctx: &Arc<Context>,
) -> Result<(), Action> {
let name = cdb.name_any();
let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
error!("Namespace is empty for instance: {}.", name);
Action::requeue(Duration::from_secs(300))
})?;

let backup_api: Api<Backup> = Api::namespaced(ctx.client.clone(), namespace);

// List all backups for the cluster
let lp = ListParams {
label_selector: Some(format!("cnpg.io/cluster={}", name.as_str())),
..ListParams::default()
};
let backups = backup_api.list(&lp).await.map_err(|e| {
error!("Error listing backups: {}", e);
Action::requeue(Duration::from_secs(300))
})?;

let stalled_time = Time(chrono::Utc::now() - chrono::Duration::hours(6));

// Filter backups that do not have a status set and are older than 24 hours
for backup in &backups.items {
if backup.status.is_none() {
if let Some(creation_time) = backup.metadata.creation_timestamp.as_ref() {
if creation_time < &stalled_time {
info!("Deleting stalled backup: {}", backup.name_any());
match backup_api
.delete(&backup.name_any(), &DeleteParams::default())
.await
{
Ok(_) => {
info!("Successfully deleted stalled backup: {}", backup.name_any())
}
Err(e) => error!(
"Failed to delete stalled backup {}: {}",
backup.name_any(),
e
),
}
}
}
}
}

Ok(())
}
15 changes: 14 additions & 1 deletion tembo-operator/src/cloudnativepg/hibernate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use k8s_openapi::api::apps::v1::Deployment;
use crate::app_service::manager::get_appservice_deployment_objects;
use crate::cloudnativepg::cnpg_utils::{
get_pooler_instances, patch_cluster_merge, patch_pooler_merge, patch_scheduled_backup_merge,
removed_stalled_backups,
};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -179,6 +180,10 @@ pub async fn reconcile_cluster_hibernation(cdb: &CoreDB, ctx: &Arc<Context>) ->
name, hibernation_value
);
if cdb.spec.stop {
// Only remove stalled backups if the instance is stopped/paused
info!("Remove any stalled backups for paused instance {}", name);
removed_stalled_backups(cdb, ctx).await?;

info!("Fully reconciled stopped instance {}", name);
return Err(requeue_normal_with_jitter());
}
Expand Down Expand Up @@ -283,6 +288,7 @@ async fn update_scheduled_backups(
let scheduled_backup_value = cdb.spec.stop;

for sb in scheduled_backups {
let scheduled_backup_name = sb.metadata.name.as_deref().unwrap_or(&name);
let scheduled_backup_suspend_status = sb.spec.suspend.unwrap_or_default();

if scheduled_backup_suspend_status != scheduled_backup_value {
Expand All @@ -292,7 +298,14 @@ async fn update_scheduled_backups(
}
});

match patch_scheduled_backup_merge(cdb, ctx, patch_scheduled_backup_spec).await {
match patch_scheduled_backup_merge(
cdb,
ctx,
scheduled_backup_name,
patch_scheduled_backup_spec,
)
.await
{
Ok(_) => {
info!(
"Toggled scheduled backup suspend of {} to '{}'",
Expand Down
18 changes: 10 additions & 8 deletions tembo-operator/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ mod test {
// Assert no tables found
let result =
psql_with_retry(context.clone(), coredb_resource.clone(), "\\dt".to_string()).await;
println!("psql out: {}", result.stdout.clone().unwrap());
// println!("psql out: {}", result.stdout.clone().unwrap());
assert!(!result.stdout.clone().unwrap().contains("customers"));

let result = psql_with_retry(
Expand All @@ -1341,13 +1341,13 @@ mod test {
.to_string(),
)
.await;
println!("{}", result.stdout.clone().unwrap());
// println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("CREATE TABLE"));

// Assert table 'customers' exists
let result =
psql_with_retry(context.clone(), coredb_resource.clone(), "\\dt".to_string()).await;
println!("{}", result.stdout.clone().unwrap());
// println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("customers"));

let result = wait_until_psql_contains(
Expand All @@ -1359,14 +1359,15 @@ mod test {
)
.await;

println!("{}", result.stdout.clone().unwrap());
// println!("{}", result.stdout.clone().unwrap());
assert!(result.stdout.clone().unwrap().contains("aggs_for_vecs"));

// Check for metrics and availability
let metric_name = format!("cnpg_collector_up{{cluster=\"{}\"}} 1", name);
match wait_for_metric(pods.clone(), pod_name.to_string(), &metric_name).await {
Ok(result_stdout) => {
println!("Metric found: {}", result_stdout);
Ok(_result_stdout) => {
println!("Metric found for: {}", pod_name);
// println!("Metrics: {}", result_stdout);
}
Err(e) => {
panic!("Failed to find metric: {}", e);
Expand All @@ -1375,8 +1376,9 @@ mod test {

// Look for the custom metric
match wait_for_metric(pods.clone(), pod_name.to_string(), &test_metric_decr).await {
Ok(result_stdout) => {
println!("Metric found: {}", result_stdout);
Ok(_result_stdout) => {
println!("Metric found for: {}", pod_name);
// println!("Metrics: {}", result_stdout);
}
Err(e) => {
panic!("Failed to find metric: {}", e);
Expand Down

0 comments on commit f3f2729

Please sign in to comment.